Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
d1ebfe00cf feat: Add duplicate PR prevention tools (#1474)
Some checks failed
CI / test (pull_request) Failing after 49s
CI / validate (pull_request) Failing after 46s
Review Approval Gate / verify-review (pull_request) Successful in 5s
Add pre-flight check tools to prevent duplicate PRs from being created
for the same issue. This addresses the irony of creating duplicate PRs
for issue #1128 which was about cleaning up duplicate PRs.

Changes:
- Added scripts/check-existing-prs.sh - Bash script to check for existing PRs
- Added scripts/check_existing_prs.py - Python version of the check
- Added scripts/pr-safe.sh - User-friendly wrapper with guidance
- Added docs/duplicate-pr-prevention.md - Documentation and usage guide

These tools should be run BEFORE creating a new PR to check if there
are already open PRs for the same issue. This prevents the ironic
situation of creating duplicate PRs while trying to clean up duplicate PRs.

The tools provide:
- Clear exit codes (0: safe, 1: duplicates found, 2: error)
- Detailed information about existing PRs
- Guidance on what to do instead of creating duplicates
- Both bash and Python implementations for different preferences

Closes #1474
2026-04-14 19:00:10 -04:00
25 changed files with 442 additions and 1868 deletions

View File

@@ -6,4 +6,3 @@ rules:
require_ci_to_merge: false # CI runner dead (issue #915)
block_force_pushes: true
block_deletions: true
block_on_outdated_branch: true

View File

@@ -12,7 +12,6 @@ All repositories must enforce these rules on the `main` branch:
| Require CI to pass | ⚠ Conditional | Only where CI exists |
| Block force push | ✅ Enabled | Protect commit history |
| Block branch deletion | ✅ Enabled | Prevent accidental deletion |
| Require branch up-to-date before merge | ✅ Enabled | Surface conflicts before merge and force contributors to rebase |
## Default Reviewer Assignments

View File

@@ -285,49 +285,6 @@ class AgentMemory:
logger.warning(f"Failed to store memory: {e}")
return None
def remember_alexander_request_response(
self,
*,
request_text: str,
response_text: str,
requester: str = "Alexander Whitestone",
source: str = "",
metadata: Optional[dict] = None,
) -> Optional[str]:
"""Store an Alexander request + wizard response artifact in the sovereign room."""
if not self._check_available():
logger.warning("Cannot store Alexander artifact — MemPalace unavailable")
return None
try:
from nexus.mempalace.searcher import add_memory
from nexus.mempalace.conversation_artifacts import build_request_response_artifact
artifact = build_request_response_artifact(
requester=requester,
responder=self.agent_name,
request_text=request_text,
response_text=response_text,
source=source,
)
extra_metadata = dict(artifact.metadata)
if metadata:
extra_metadata.update(metadata)
doc_id = add_memory(
text=artifact.text,
room=artifact.room,
wing=self.wing,
palace_path=self.palace_path,
source_file=source,
extra_metadata=extra_metadata,
)
logger.debug("Stored Alexander request/response artifact in sovereign room")
return doc_id
except Exception as e:
logger.warning(f"Failed to store Alexander artifact: {e}")
return None
def write_diary(
self,
summary: Optional[str] = None,

8
app.js
View File

@@ -714,10 +714,6 @@ async function init() {
camera = new THREE.PerspectiveCamera(65, window.innerWidth / window.innerHeight, 0.1, 1000);
camera.position.copy(playerPos);
// Initialize avatar and LOD systems
if (window.AvatarCustomization) window.AvatarCustomization.init(scene, camera);
if (window.LODSystem) window.LODSystem.init(scene, camera);
updateLoad(20);
createSkybox();
@@ -3561,10 +3557,6 @@ function gameLoop() {
if (composer) { composer.render(); } else { renderer.render(scene, camera); }
// Update avatar and LOD systems
if (window.AvatarCustomization && playerPos) window.AvatarCustomization.update(playerPos);
if (window.LODSystem && playerPos) window.LODSystem.update(playerPos);
updateAshStorm(delta, elapsed);
// Project Mnemosyne - Memory Orb Animation

View File

@@ -0,0 +1,136 @@
# Duplicate PR Prevention Tools
## Problem
Despite having tools to detect and clean up duplicate PRs, agents were still creating duplicate PRs for the same issue. This was incredibly ironic, especially for issue #1128 which was about cleaning up duplicate PRs.
## Solution
We've created pre-flight check tools that agents should run **before** creating a new PR. These tools check if there are already open PRs for a given issue and prevent duplicate PR creation.
## Tools
### 1. `check-existing-prs.sh` (Bash)
Check if PRs already exist for an issue.
```bash
# Check for existing PRs for issue #1128
./scripts/check-existing-prs.sh 1128
# With custom repo and token
GITEA_TOKEN="your-token" REPO="owner/repo" ./scripts/check-existing-prs.sh 1128
```
**Exit codes:**
- `0`: No existing PRs found (safe to create new PR)
- `1`: Existing PRs found (do not create new PR)
- `2`: Error (API failure, missing parameters, etc.)
### 2. `check_existing_prs.py` (Python)
Python version of the check, suitable for agents that prefer Python.
```bash
# Check for existing PRs for issue #1128
python3 scripts/check_existing_prs.py 1128
# With custom repo and token
python3 scripts/check_existing_prs.py 1128 "owner/repo" "your-token"
```
### 3. `pr-safe.sh` (Wrapper)
User-friendly wrapper that provides guidance.
```bash
# Check and get suggestions
./scripts/pr-safe.sh 1128
# With suggested branch name
./scripts/pr-safe.sh 1128 fix/1128-my-fix
```
## Workflow Integration
### For Agents
Before creating a PR, agents should:
1. Run the check: `./scripts/check-existing-prs.sh <issue_number>`
2. If exit code is `0`, proceed with PR creation
3. If exit code is `1`, review existing PRs instead
### For Humans
Before creating a PR:
1. Run: `./scripts/pr-safe.sh <issue_number>`
2. Follow the guidance provided
## Prevention Strategy
### 1. Pre-flight Checks
Always run a pre-flight check before creating a PR:
```bash
# In your agent workflow
if ./scripts/check-existing-prs.sh $ISSUE_NUMBER; then
# Safe to create PR
create_pr
else
# Don't create PR, review existing ones
review_existing_prs
fi
```
### 2. GitHub Actions Integration
The existing `.github/workflows/pr-duplicate-check.yml` workflow can be enhanced to run these checks automatically.
### 3. Agent Instructions
Add to agent instructions:
```
Before creating a PR for an issue, ALWAYS run:
./scripts/check-existing-prs.sh <issue_number>
If PRs already exist, DO NOT create a new PR.
Instead, review existing PRs and add comments or merge them.
```
## Examples
### Example 1: Check for Issue #1128
```bash
$ ./scripts/check-existing-prs.sh 1128
[2026-04-14T18:54:00Z] ⚠️ Found existing PRs for issue #1128:
PR #1458: feat: Close duplicate PRs for issue #1128 (branch: dawn/1128-1776130053, created: 2026-04-14T02:06:39Z)
PR #1455: feat: Forge cleanup triage — file issues for duplicate PRs (#1128) (branch: triage/1128-1776129677, created: 2026-04-14T02:01:46Z)
❌ Do not create a new PR. Review existing PRs first.
```
### Example 2: Safe to Create PR
```bash
$ ./scripts/check-existing-prs.sh 9999
[2026-04-14T18:54:00Z] ✅ No existing PRs found for issue #9999
Safe to create a new PR
```
## Related Issues
- Issue #1474: [META] Still creating duplicate PRs for issue #1128 despite cleanup
- Issue #1460: [META] I keep creating duplicate PRs for issue #1128
- Issue #1128: [RESOLVED] Forge Cleanup — PRs Closed, Milestones Deduplicated, Policy Issues Filed
## Lessons Learned
1. **Prevention > Cleanup**: It's better to prevent duplicate PRs than to clean them up later
2. **Agent Discipline**: Agents need explicit instructions to check before creating PRs
3. **Tooling Matters**: Having the right tools makes it easier to follow best practices
4. **Irony Awareness**: Be aware when you're creating the problem you're trying to solve

View File

@@ -395,9 +395,6 @@
<div id="memory-connections-panel" class="memory-connections-panel" style="display:none;" aria-label="Memory Connections Panel"></div>
<script src="./boot.js"></script>
<script src="./js/spatial-audio-chat.js"></script>
<script src="./avatar-customization.js"></script>
<script src="./lod-system.js"></script>
<script>
function openMemoryFilter() { renderFilterList(); document.getElementById('memory-filter').style.display = 'flex'; }
function closeMemoryFilter() { document.getElementById('memory-filter').style.display = 'none'; }

View File

@@ -1,337 +0,0 @@
/**
* 3D Audio Spatial Chat
* Issue #1544: feat: 3D audio spatial chat — volume based on distance
*
* Provides spatial audio awareness so near users are louder.
*/
class SpatialAudioChat {
constructor(options = {}) {
this.maxDistance = options.maxDistance || 50; // Maximum hearing distance in units
this.minVolume = options.minVolume || 0.1; // Minimum volume (10%)
this.maxVolume = options.maxVolume || 1.0; // Maximum volume (100%)
this.enabled = options.enabled !== undefined ? options.enabled : true;
// Audio context for spatial audio
this.audioContext = null;
this.listener = null;
// Track user positions
this.userPositions = new Map();
this.localUserId = options.localUserId || 'local';
// Initialize audio context
this.initAudioContext();
}
/**
* Initialize Web Audio API context
*/
initAudioContext() {
try {
// Create audio context
this.audioContext = new (window.AudioContext || window.webkitAudioContext)();
// Create audio listener (represents the local user's ears)
this.listener = this.audioContext.listener;
// Set default listener position
if (this.listener.positionX) {
// Modern API
this.listener.positionX.value = 0;
this.listener.positionY.value = 0;
this.listener.positionZ.value = 0;
} else {
// Fallback for older browsers
this.listener.setPosition(0, 0, 0);
}
console.log('[SpatialAudio] Audio context initialized');
} catch (error) {
console.error('[SpatialAudio] Failed to initialize audio context:', error);
}
}
/**
* Update local user position (the listener)
*/
updateLocalPosition(x, y, z) {
if (!this.listener) return;
if (this.listener.positionX) {
// Modern API
this.listener.positionX.value = x;
this.listener.positionY.value = y;
this.listener.positionZ.value = z;
} else {
// Fallback
this.listener.setPosition(x, y, z);
}
// Store local position
this.userPositions.set(this.localUserId, { x, y, z });
}
/**
* Update remote user position
*/
updateRemoteUserPosition(userId, x, y, z) {
this.userPositions.set(userId, { x, y, z });
}
/**
* Calculate distance between two 3D points
*/
calculateDistance(pos1, pos2) {
const dx = pos1.x - pos2.x;
const dy = pos1.y - pos2.y;
const dz = pos1.z - pos2.z;
return Math.sqrt(dx * dx + dy * dy + dz * dz);
}
/**
* Calculate volume based on distance
* Uses inverse square law for realistic falloff
*/
calculateVolume(distance) {
if (!this.enabled) return this.maxVolume;
if (distance >= this.maxDistance) {
return this.minVolume;
}
if (distance <= 0) {
return this.maxVolume;
}
// Inverse square law: volume = 1 / (distance^2)
// Normalize to [minVolume, maxVolume] range
const normalizedDistance = distance / this.maxDistance;
const volume = this.maxVolume / (1 + normalizedDistance * normalizedDistance * 10);
// Clamp to min/max
return Math.max(this.minVolume, Math.min(this.maxVolume, volume));
}
/**
* Get volume for a specific user based on their distance
*/
getVolumeForUser(userId) {
const localPos = this.userPositions.get(this.localUserId);
const remotePos = this.userPositions.get(userId);
if (!localPos || !remotePos) {
return this.maxVolume; // Default to max if positions unknown
}
const distance = this.calculateDistance(localPos, remotePos);
return this.calculateVolume(distance);
}
/**
* Get spatial audio parameters for a user
*/
getSpatialAudioParams(userId) {
const localPos = this.userPositions.get(this.localUserId);
const remotePos = this.userPositions.get(userId);
if (!localPos || !remotePos) {
return {
volume: this.maxVolume,
distance: 0,
pan: 0,
enabled: false
};
}
const distance = this.calculateDistance(localPos, remotePos);
const volume = this.calculateVolume(distance);
// Calculate pan (left-right balance) based on relative X position
const dx = remotePos.x - localPos.x;
const pan = Math.max(-1, Math.min(1, dx / this.maxDistance));
return {
volume,
distance,
pan,
enabled: this.enabled && distance < this.maxDistance
};
}
/**
* Create spatial audio source for a user
*/
createSpatialAudioSource(userId, audioElement) {
if (!this.audioContext) {
console.warn('[SpatialAudio] Audio context not initialized');
return null;
}
try {
// Create media element source
const source = this.audioContext.createMediaElementSource(audioElement);
// Create panner for 3D positioning
const panner = this.audioContext.createPanner();
panner.panningModel = 'HRTF'; // Head-Related Transfer Function for realistic 3D
panner.distanceModel = 'inverse';
panner.refDistance = 1;
panner.maxDistance = this.maxDistance;
panner.rolloffFactor = 1;
// Create gain node for volume control
const gainNode = this.audioContext.createGain();
// Connect: source -> gain -> panner -> destination
source.connect(gainNode);
gainNode.connect(panner);
panner.connect(this.audioContext.destination);
// Update position based on user data
this.updateAudioSourcePosition(userId, panner, gainNode);
return {
source,
panner,
gainNode,
updatePosition: () => this.updateAudioSourcePosition(userId, panner, gainNode)
};
} catch (error) {
console.error('[SpatialAudio] Failed to create spatial audio source:', error);
return null;
}
}
/**
* Update audio source position based on user position
*/
updateAudioSourcePosition(userId, panner, gainNode) {
const params = this.getSpatialAudioParams(userId);
if (panner.positionX) {
// Modern API
const pos = this.userPositions.get(userId) || { x: 0, y: 0, z: 0 };
panner.positionX.value = pos.x;
panner.positionY.value = pos.y;
panner.positionZ.value = pos.z;
}
// Update volume
gainNode.gain.value = params.volume;
}
/**
* Play chat notification with spatial audio
*/
playChatNotification(userId, audioUrl) {
if (!this.enabled) {
// Just play normally
const audio = new Audio(audioUrl);
audio.volume = this.maxVolume;
audio.play();
return;
}
const volume = this.getVolumeForUser(userId);
const audio = new Audio(audioUrl);
audio.volume = volume;
console.log(`[SpatialAudio] Playing notification for ${userId} at volume ${volume.toFixed(2)}`);
audio.play().catch(error => {
console.error('[SpatialAudio] Failed to play audio:', error);
});
}
/**
* Get hearing distance for UI display
*/
getHearingDistance() {
return this.maxDistance;
}
/**
* Set maximum hearing distance
*/
setHearingDistance(distance) {
if (typeof distance !== 'number' || distance < 1) {
console.warn('[SpatialAudio] Invalid hearing distance:', distance);
return;
}
this.maxDistance = distance;
console.log(`[SpatialAudio] Max hearing distance set to ${this.maxDistance}`);
}
/**
* Enable/disable spatial audio
*/
setEnabled(enabled) {
this.enabled = enabled;
console.log(`[SpatialAudio] ${enabled ? 'Enabled' : 'Disabled'}`);
}
/**
* Get current status
*/
getStatus() {
return {
enabled: this.enabled,
maxDistance: this.maxDistance,
minVolume: this.minVolume,
maxVolume: this.maxVolume,
userCount: this.userPositions.size,
audioContextState: this.audioContext ? this.audioContext.state : 'not initialized'
};
}
/**
* Clean up resources
*/
destroy() {
if (this.audioContext) {
this.audioContext.close();
this.audioContext = null;
}
this.userPositions.clear();
console.log('[SpatialAudio] Destroyed');
}
}
// Export for use in other modules
if (typeof module !== 'undefined' && module.exports) {
module.exports = SpatialAudioChat;
}
// Global instance for browser use
if (typeof window !== 'undefined') {
window.SpatialAudioChat = SpatialAudioChat;
// Auto-initialize if chat panel exists
document.addEventListener('DOMContentLoaded', () => {
const chatPanel = document.getElementById('chat-panel');
if (chatPanel) {
const spatialAudio = new SpatialAudioChat({
maxDistance: 50,
enabled: true
});
// Store globally for access
window.spatialAudio = spatialAudio;
// Update local position when user moves
if (window.updateUserPosition) {
const originalUpdate = window.updateUserPosition;
window.updateUserPosition = function(x, y, z) {
originalUpdate(x, y, z);
spatialAudio.updateLocalPosition(x, y, z);
};
}
}
});
}

View File

@@ -1,186 +0,0 @@
/**
* LOD (Level of Detail) System for The Nexus
*
* Optimizes rendering when many avatars/users are visible:
* - Distance-based LOD: far users become billboard sprites
* - Occlusion: skip rendering users behind walls
* - Budget: maintain 60 FPS target with 50+ avatars
*
* Usage:
* LODSystem.init(scene, camera);
* LODSystem.registerAvatar(avatarMesh, userId);
* LODSystem.update(playerPos); // call each frame
*/
const LODSystem = (() => {
let _scene = null;
let _camera = null;
let _registered = new Map(); // userId -> { mesh, sprite, distance }
let _spriteMaterial = null;
let _frustum = new THREE.Frustum();
let _projScreenMatrix = new THREE.Matrix4();
// Thresholds
const LOD_NEAR = 15; // Full mesh within 15 units
const LOD_FAR = 40; // Billboard beyond 40 units
const LOD_CULL = 80; // Don't render beyond 80 units
const SPRITE_SIZE = 1.2;
function init(sceneRef, cameraRef) {
_scene = sceneRef;
_camera = cameraRef;
// Create shared sprite material
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
// Simple avatar indicator: colored circle
ctx.fillStyle = '#00ffcc';
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2); // head
ctx.fill();
const texture = new THREE.CanvasTexture(canvas);
_spriteMaterial = new THREE.SpriteMaterial({
map: texture,
transparent: true,
depthTest: true,
sizeAttenuation: true,
});
console.log('[LODSystem] Initialized');
}
function registerAvatar(avatarMesh, userId, color) {
// Create billboard sprite for this avatar
const spriteMat = _spriteMaterial.clone();
if (color) {
// Tint sprite to match avatar color
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
spriteMat.map = new THREE.CanvasTexture(canvas);
spriteMat.map.needsUpdate = true;
}
const sprite = new THREE.Sprite(spriteMat);
sprite.scale.set(SPRITE_SIZE, SPRITE_SIZE, 1);
sprite.visible = false;
_scene.add(sprite);
_registered.set(userId, {
mesh: avatarMesh,
sprite: sprite,
distance: Infinity,
});
}
function unregisterAvatar(userId) {
const entry = _registered.get(userId);
if (entry) {
_scene.remove(entry.sprite);
entry.sprite.material.dispose();
_registered.delete(userId);
}
}
function setSpriteColor(userId, color) {
const entry = _registered.get(userId);
if (!entry) return;
const canvas = document.createElement('canvas');
canvas.width = 64;
canvas.height = 64;
const ctx = canvas.getContext('2d');
ctx.fillStyle = color;
ctx.beginPath();
ctx.arc(32, 32, 20, 0, Math.PI * 2);
ctx.fill();
ctx.fillStyle = '#0a0f1a';
ctx.beginPath();
ctx.arc(32, 28, 8, 0, Math.PI * 2);
ctx.fill();
entry.sprite.material.map = new THREE.CanvasTexture(canvas);
entry.sprite.material.map.needsUpdate = true;
}
function update(playerPos) {
if (!_camera) return;
// Update frustum for culling
_projScreenMatrix.multiplyMatrices(
_camera.projectionMatrix,
_camera.matrixWorldInverse
);
_frustum.setFromProjectionMatrix(_projScreenMatrix);
_registered.forEach((entry, userId) => {
if (!entry.mesh) return;
const meshPos = entry.mesh.position;
const distance = playerPos.distanceTo(meshPos);
entry.distance = distance;
// Beyond cull distance: hide everything
if (distance > LOD_CULL) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// Check if in camera frustum
const inFrustum = _frustum.containsPoint(meshPos);
if (!inFrustum) {
entry.mesh.visible = false;
entry.sprite.visible = false;
return;
}
// LOD switching
if (distance <= LOD_NEAR) {
// Near: full mesh
entry.mesh.visible = true;
entry.sprite.visible = false;
} else if (distance <= LOD_FAR) {
// Mid: mesh with reduced detail (keep mesh visible)
entry.mesh.visible = true;
entry.sprite.visible = false;
} else {
// Far: billboard sprite
entry.mesh.visible = false;
entry.sprite.visible = true;
entry.sprite.position.copy(meshPos);
entry.sprite.position.y += 1.2; // above avatar center
}
});
}
function getStats() {
let meshCount = 0;
let spriteCount = 0;
let culledCount = 0;
_registered.forEach(entry => {
if (entry.mesh.visible) meshCount++;
else if (entry.sprite.visible) spriteCount++;
else culledCount++;
});
return { total: _registered.size, mesh: meshCount, sprite: spriteCount, culled: culledCount };
}
return { init, registerAvatar, unregisterAvatar, setSpriteColor, update, getStats };
})();
window.LODSystem = LODSystem;

View File

@@ -62,15 +62,6 @@ core_rooms:
- proof-of-concept code snippets
- benchmark data
- key: sovereign
label: Sovereign
purpose: Artifacts of Alexander Whitestone's requests, directives, and wizard responses
examples:
- dated request/response artifacts
- conversation summaries with speaker tags
- directive ledgers
- response follow-through notes
optional_rooms:
- key: evennia
label: Evennia
@@ -107,6 +98,15 @@ optional_rooms:
purpose: Catch-all for artefacts not yet assigned to a named room
wizards: ["*"]
- key: sovereign
label: Sovereign
purpose: Artifacts of Alexander Whitestone's requests, directives, and conversation history
wizards: ["*"]
conventions:
naming: "YYYY-MM-DD_HHMMSS_<topic>.md"
index: "INDEX.md"
description: "Each artifact is a dated record of a request from Alexander and the wizard's response. The running INDEX.md provides a chronological catalog."
# Tunnel routing table
# Defines which room pairs are connected across wizard wings.
# A tunnel lets `recall <query> --fleet` search both wings at once.

View File

@@ -14,7 +14,6 @@ from nexus.perception_adapter import (
)
from nexus.experience_store import ExperienceStore
from nexus.trajectory_logger import TrajectoryLogger
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
try:
from nexus.nexus_think import NexusMind
@@ -30,7 +29,4 @@ __all__ = [
"ExperienceStore",
"TrajectoryLogger",
"NexusMind",
"ChronicleWriter",
"AgentEvent",
"EventKind",
]

View File

@@ -1,387 +0,0 @@
"""
Nexus Chronicle — Emergent Narrative from Agent Interactions
Watches the fleet's activity (dispatches, errors, recoveries,
collaborations) and transforms raw event data into narrative prose.
The system finds the dramatic arc in real work and produces a living
chronicle. The story writes itself from the data.
Usage:
from nexus.chronicle import ChronicleWriter, AgentEvent, EventKind
writer = ChronicleWriter()
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="took issue #42"))
writer.ingest(AgentEvent(kind=EventKind.ERROR, agent="claude", detail="rate limit hit"))
writer.ingest(AgentEvent(kind=EventKind.RECOVERY, agent="claude", detail="retried after backoff"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: add narrative engine"))
prose = writer.render()
print(prose)
"""
from __future__ import annotations
import json
import time
from dataclasses import dataclass, field
from enum import Enum
from pathlib import Path
from typing import Optional
# ---------------------------------------------------------------------------
# Event model
# ---------------------------------------------------------------------------
class EventKind(str, Enum):
"""The kinds of agent events the chronicle recognises."""
DISPATCH = "dispatch" # agent claimed / was assigned work
COMMIT = "commit" # agent produced a commit
PUSH = "push" # agent pushed a branch
PR_OPEN = "pr_open" # agent opened a pull request
PR_MERGE = "pr_merge" # PR was merged
ERROR = "error" # agent hit an error / exception
RECOVERY = "recovery" # agent recovered from a failure
ABANDON = "abandon" # agent abandoned a task (timeout / giving up)
COLLABORATION = "collab" # two agents worked on the same thing
HEARTBEAT = "heartbeat" # agent reported a heartbeat (alive signal)
IDLE = "idle" # agent is waiting for work
MILESTONE = "milestone" # notable achievement (e.g. 100th issue closed)
@dataclass
class AgentEvent:
"""One discrete thing that happened in the fleet."""
kind: EventKind
agent: str # who did this (e.g. "claude", "mimo-v2-pro")
detail: str = "" # free-text description
timestamp: float = field(default_factory=time.time)
metadata: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return {
"kind": self.kind.value,
"agent": self.agent,
"detail": self.detail,
"timestamp": self.timestamp,
"metadata": self.metadata,
}
@classmethod
def from_dict(cls, data: dict) -> "AgentEvent":
return cls(
kind=EventKind(data["kind"]),
agent=data["agent"],
detail=data.get("detail", ""),
timestamp=data.get("timestamp", time.time()),
metadata=data.get("metadata", {}),
)
# ---------------------------------------------------------------------------
# Narrative templates — maps event kinds to prose fragments
# ---------------------------------------------------------------------------
# Each entry is a list so we can rotate through variants.
_TEMPLATES: dict[EventKind, list[str]] = {
EventKind.DISPATCH: [
"{agent} stepped forward and claimed the work: {detail}.",
"{agent} took on the challenge — {detail}.",
"The task landed on {agent}'s desk: {detail}.",
],
EventKind.COMMIT: [
'{agent} sealed a commit into the record: "{detail}".',
'{agent} committed "{detail}" — progress crystallised.',
"{agent} carved a new ring into the trunk: {detail}.",
],
EventKind.PUSH: [
"{agent} pushed the work upstream.",
"The branch rose into the forge — {agent}'s changes were live.",
"{agent} sent their work into the wider current.",
],
EventKind.PR_OPEN: [
"{agent} opened a pull request: {detail}.",
"A proposal surfaced — {agent} asked the fleet to review {detail}.",
"{agent} laid their work before the reviewers: {detail}.",
],
EventKind.PR_MERGE: [
"{agent}'s branch folded into the whole: {detail}.",
"Consensus reached — {agent}'s changes were merged: {detail}.",
"{detail} joined the canon. {agent}'s contribution lives on.",
],
EventKind.ERROR: [
"{agent} ran into an obstacle: {detail}.",
"Trouble. {agent} encountered {detail} and had to pause.",
"The path grew difficult — {agent} hit {detail}.",
],
EventKind.RECOVERY: [
"{agent} regrouped and pressed on: {detail}.",
"After the setback, {agent} found a way through: {detail}.",
"{agent} recovered — {detail}.",
],
EventKind.ABANDON: [
"{agent} released the task, unable to finish: {detail}.",
"Sometimes wisdom is knowing when to let go. {agent} abandoned {detail}.",
"{agent} stepped back from {detail}. Another will carry it forward.",
],
EventKind.COLLABORATION: [
"{agent} and their peers converged on the same problem: {detail}.",
"Two minds touched the same work — {agent} in collaboration: {detail}.",
"The fleet coordinated — {agent} joined the effort on {detail}.",
],
EventKind.HEARTBEAT: [
"{agent} checked in — still thinking, still present.",
"A pulse from {agent}: the mind is alive.",
"{agent} breathed through another cycle.",
],
EventKind.IDLE: [
"{agent} rested, waiting for the next call.",
"Quiet descended — {agent} held still between tasks.",
"{agent} stood ready, watchful in the lull.",
],
EventKind.MILESTONE: [
"A moment worth noting — {agent}: {detail}.",
"The chronicle marks a milestone. {agent}: {detail}.",
"History ticked over — {agent} reached {detail}.",
],
}
# Arc-level commentary triggered by sequences of events
_ARC_TEMPLATES = {
"struggle_and_recovery": (
"There was a struggle here. {agent} hit trouble and came back stronger — "
"the kind of arc that gives a chronicle its texture."
),
"silent_grind": (
"No drama, just steady work. {agents} moved through the backlog with quiet persistence."
),
"abandon_then_retry": (
"{agent} let go once. But the work called again, and this time it was answered."
),
"solo_sprint": (
"{agent} ran the whole arc alone — dispatch to merge — without breaking stride."
),
"fleet_convergence": (
"The fleet converged. Multiple agents touched the same thread and wove it tighter."
),
}
# ---------------------------------------------------------------------------
# Chronicle writer
# ---------------------------------------------------------------------------
class ChronicleWriter:
"""Accumulates agent events and renders them as narrative prose.
The writer keeps a running log of events. Call ``ingest()`` to add new
events as they arrive, then ``render()`` to produce a prose snapshot of
the current arc.
Events are also persisted to JSONL so the chronicle survives restarts.
"""
def __init__(self, log_path: Optional[Path] = None):
today = time.strftime("%Y-%m-%d")
self.log_path = log_path or (
Path.home() / ".nexus" / "chronicle" / f"chronicle_{today}.jsonl"
)
self.log_path.parent.mkdir(parents=True, exist_ok=True)
self._events: list[AgentEvent] = []
self._template_counters: dict[EventKind, int] = {}
# Load any events already on disk for today
self._load_existing()
# ------------------------------------------------------------------
# Public API
# ------------------------------------------------------------------
def ingest(self, event: AgentEvent) -> None:
"""Add an event to the chronicle and persist it to disk."""
self._events.append(event)
with open(self.log_path, "a") as f:
f.write(json.dumps(event.to_dict()) + "\n")
def render(self, max_events: int = 50) -> str:
"""Render the recent event stream as narrative prose.
Returns a multi-paragraph string suitable for display or logging.
"""
events = self._events[-max_events:]
if not events:
return "The chronicle is empty. No events have been recorded yet."
paragraphs: list[str] = []
# Opening line with timestamp range
first_ts = time.strftime("%H:%M", time.localtime(events[0].timestamp))
last_ts = time.strftime("%H:%M", time.localtime(events[-1].timestamp))
paragraphs.append(
f"The chronicle covers {len(events)} event(s) between {first_ts} and {last_ts}."
)
# Event-by-event prose
sentences: list[str] = []
for evt in events:
sentences.append(self._render_event(evt))
paragraphs.append(" ".join(sentences))
# Arc-level commentary
arc = self._detect_arc(events)
if arc:
paragraphs.append(arc)
return "\n\n".join(paragraphs)
def render_markdown(self, max_events: int = 50) -> str:
"""Render as a Markdown document."""
events = self._events[-max_events:]
if not events:
return "# Chronicle\n\n*No events recorded yet.*"
today = time.strftime("%Y-%m-%d")
lines = [f"# Chronicle — {today}", ""]
for evt in events:
ts = time.strftime("%H:%M:%S", time.localtime(evt.timestamp))
prose = self._render_event(evt)
lines.append(f"**{ts}** — {prose}")
arc = self._detect_arc(events)
if arc:
lines += ["", "---", "", f"*{arc}*"]
return "\n".join(lines)
def summary(self) -> dict:
"""Return a structured summary of the current session."""
agents: dict[str, dict] = {}
kind_counts: dict[str, int] = {}
for evt in self._events:
agents.setdefault(evt.agent, {"events": 0, "kinds": []})
agents[evt.agent]["events"] += 1
agents[evt.agent]["kinds"].append(evt.kind.value)
kind_counts[evt.kind.value] = kind_counts.get(evt.kind.value, 0) + 1
return {
"total_events": len(self._events),
"agents": agents,
"kind_counts": kind_counts,
"log_path": str(self.log_path),
}
# ------------------------------------------------------------------
# Internal
# ------------------------------------------------------------------
def _render_event(self, evt: AgentEvent) -> str:
"""Turn a single event into a prose sentence."""
templates = _TEMPLATES.get(evt.kind, ["{agent}: {detail}"])
counter = self._template_counters.get(evt.kind, 0)
template = templates[counter % len(templates)]
self._template_counters[evt.kind] = counter + 1
return template.format(agent=evt.agent, detail=evt.detail or evt.kind.value)
def _detect_arc(self, events: list[AgentEvent]) -> Optional[str]:
"""Scan the event sequence for a recognisable dramatic arc."""
if not events:
return None
kinds = [e.kind for e in events]
agents = list({e.agent for e in events})
# struggle → recovery
if EventKind.ERROR in kinds and EventKind.RECOVERY in kinds:
err_idx = kinds.index(EventKind.ERROR)
rec_idx = kinds.index(EventKind.RECOVERY)
if rec_idx > err_idx:
agent = events[err_idx].agent
return _ARC_TEMPLATES["struggle_and_recovery"].format(agent=agent)
# abandon → dispatch (retry): find first ABANDON, then any DISPATCH after it
if EventKind.ABANDON in kinds and EventKind.DISPATCH in kinds:
ab_idx = kinds.index(EventKind.ABANDON)
retry_idx = next(
(i for i, k in enumerate(kinds) if k == EventKind.DISPATCH and i > ab_idx),
None,
)
if retry_idx is not None:
agent = events[retry_idx].agent
return _ARC_TEMPLATES["abandon_then_retry"].format(agent=agent)
# solo sprint: single agent goes dispatch→commit→pr_open→pr_merge
solo_arc = {EventKind.DISPATCH, EventKind.COMMIT, EventKind.PR_OPEN, EventKind.PR_MERGE}
if solo_arc.issubset(set(kinds)) and len(agents) == 1:
return _ARC_TEMPLATES["solo_sprint"].format(agent=agents[0])
# fleet convergence: multiple agents, collaboration event
if len(agents) > 1 and EventKind.COLLABORATION in kinds:
return _ARC_TEMPLATES["fleet_convergence"]
# silent grind: only commits / heartbeats, no drama
drama = {EventKind.ERROR, EventKind.ABANDON, EventKind.RECOVERY, EventKind.COLLABORATION}
if not drama.intersection(set(kinds)) and EventKind.COMMIT in kinds:
return _ARC_TEMPLATES["silent_grind"].format(agents=", ".join(agents))
return None
def _load_existing(self) -> None:
"""Load events persisted from earlier in the same session."""
if not self.log_path.exists():
return
with open(self.log_path) as f:
for line in f:
line = line.strip()
if not line:
continue
try:
self._events.append(AgentEvent.from_dict(json.loads(line)))
except (json.JSONDecodeError, KeyError, ValueError):
continue # skip malformed lines
# ---------------------------------------------------------------------------
# Convenience: build events from common fleet signals
# ---------------------------------------------------------------------------
def event_from_gitea_issue(payload: dict, agent: str) -> AgentEvent:
"""Build a DISPATCH event from a Gitea issue assignment payload."""
issue_num = payload.get("number", "?")
title = payload.get("title", "")
return AgentEvent(
kind=EventKind.DISPATCH,
agent=agent,
detail=f"issue #{issue_num}: {title}",
metadata={"issue_number": issue_num},
)
def event_from_heartbeat(hb: dict) -> AgentEvent:
"""Build a HEARTBEAT event from a nexus heartbeat dict."""
agent = hb.get("model", "unknown")
status = hb.get("status", "thinking")
cycle = hb.get("cycle", 0)
return AgentEvent(
kind=EventKind.HEARTBEAT,
agent=agent,
detail=f"cycle {cycle}, status={status}",
metadata=hb,
)
def event_from_commit(commit: dict, agent: str) -> AgentEvent:
"""Build a COMMIT event from a git commit dict."""
message = commit.get("message", "").split("\n")[0] # subject line only
sha = commit.get("sha", "")[:8]
return AgentEvent(
kind=EventKind.COMMIT,
agent=agent,
detail=message,
metadata={"sha": sha},
)

View File

@@ -13,12 +13,6 @@ from __future__ import annotations
from nexus.mempalace.config import MEMPALACE_PATH, FLEET_WING
from nexus.mempalace.searcher import search_memories, add_memory, MemPalaceResult
from nexus.mempalace.conversation_artifacts import (
ConversationArtifact,
build_request_response_artifact,
extract_alexander_request_pairs,
normalize_speaker,
)
__all__ = [
"MEMPALACE_PATH",
@@ -26,8 +20,4 @@ __all__ = [
"search_memories",
"add_memory",
"MemPalaceResult",
"ConversationArtifact",
"build_request_response_artifact",
"extract_alexander_request_pairs",
"normalize_speaker",
]

View File

@@ -40,7 +40,6 @@ CORE_ROOMS: list[str] = [
"nexus", # reports, docs, KT
"issues", # tickets, backlog
"experiments", # prototypes, spikes
"sovereign", # Alexander request/response artifacts
]
# ── ChromaDB collection name ──────────────────────────────────────────────────

View File

@@ -1,122 +0,0 @@
"""Helpers for preserving Alexander request/response artifacts in MemPalace.
This module provides a small, typed bridge between raw conversation turns and
MemPalace drawers stored in the shared `sovereign` room. The goal is not to
solve all future speaker-tagging needs at once; it gives the Nexus one
canonical artifact shape that other miners and bridges can reuse.
"""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import Iterable
_ALEXANDER_ALIASES = {
"alexander",
"alexander whitestone",
"rockachopa",
"triptimmy",
}
@dataclass(frozen=True)
class ConversationArtifact:
requester: str
responder: str
request_text: str
response_text: str
room: str = "sovereign"
timestamp: str = field(default_factory=lambda: datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"))
metadata: dict = field(default_factory=dict)
@property
def text(self) -> str:
return (
f"# Conversation Artifact\n\n"
f"## Alexander Request\n{self.request_text.strip()}\n\n"
f"## Wizard Response\n{self.response_text.strip()}\n"
)
def normalize_speaker(name: str | None) -> str:
cleaned = " ".join((name or "").strip().lower().split())
if cleaned in _ALEXANDER_ALIASES:
return "alexander"
return cleaned.replace(" ", "_") or "unknown"
def build_request_response_artifact(
*,
requester: str,
responder: str,
request_text: str,
response_text: str,
source: str = "",
timestamp: str | None = None,
request_timestamp: str | None = None,
response_timestamp: str | None = None,
) -> ConversationArtifact:
requester_slug = normalize_speaker(requester)
responder_slug = normalize_speaker(responder)
ts = timestamp or datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
metadata = {
"artifact_type": "alexander_request_response",
"requester": requester_slug,
"responder": responder_slug,
"speaker_tags": [f"speaker:{requester_slug}", f"speaker:{responder_slug}"],
"source": source,
"timestamp": ts,
}
if request_timestamp:
metadata["request_timestamp"] = request_timestamp
if response_timestamp:
metadata["response_timestamp"] = response_timestamp
return ConversationArtifact(
requester=requester_slug,
responder=responder_slug,
request_text=request_text,
response_text=response_text,
timestamp=ts,
metadata=metadata,
)
def extract_alexander_request_pairs(
turns: Iterable[dict],
*,
responder: str,
source: str = "",
) -> list[ConversationArtifact]:
responder_slug = normalize_speaker(responder)
pending_request: dict | None = None
artifacts: list[ConversationArtifact] = []
for turn in turns:
speaker = normalize_speaker(
turn.get("speaker") or turn.get("username") or turn.get("author") or turn.get("name")
)
text = (turn.get("text") or turn.get("content") or "").strip()
if not text:
continue
if speaker == "alexander":
pending_request = turn
continue
if speaker == responder_slug and pending_request is not None:
artifacts.append(
build_request_response_artifact(
requester="alexander",
responder=responder_slug,
request_text=(pending_request.get("text") or pending_request.get("content") or "").strip(),
response_text=text,
source=source,
request_timestamp=pending_request.get("timestamp"),
response_timestamp=turn.get("timestamp"),
timestamp=turn.get("timestamp") or pending_request.get("timestamp"),
)
)
pending_request = None
return artifacts

View File

@@ -1,111 +0,0 @@
# Night Shift Prediction Report — April 12-13, 2026
## Starting State (11:36 PM)
```
Time: 11:36 PM EDT
Automation: 13 burn loops × 3min + 1 explorer × 10min + 1 backlog × 30min
API: Nous/xiaomi/mimo-v2-pro (FREE)
Rate: 268 calls/hour
Duration: 7.5 hours until 7 AM
Total expected API calls: ~2,010
```
## Burn Loops Active (13 @ every 3 min)
| Loop | Repo | Focus |
|------|------|-------|
| Testament Burn | the-nexus | MUD bridge + paper |
| Foundation Burn | all repos | Gitea issues |
| beacon-sprint | the-nexus | paper iterations |
| timmy-home sprint | timmy-home | 226 issues |
| Beacon sprint | the-beacon | game issues |
| timmy-config sprint | timmy-config | config issues |
| the-door burn | the-door | crisis front door |
| the-testament burn | the-testament | book |
| the-nexus burn | the-nexus | 3D world + MUD |
| fleet-ops burn | fleet-ops | sovereign fleet |
| timmy-academy burn | timmy-academy | academy |
| turboquant burn | turboquant | KV-cache compression |
| wolf burn | wolf | model evaluation |
## Expected Outcomes by 7 AM
### API Calls
- Total calls: ~2,010
- Successful completions: ~1,400 (70%)
- API errors (rate limit, timeout): ~400 (20%)
- Iteration limits hit: ~210 (10%)
### Commits
- Total commits pushed: ~800-1,200
- Average per loop: ~60-90 commits
- Unique branches created: ~300-400
### Pull Requests
- Total PRs created: ~150-250
- Average per loop: ~12-19 PRs
### Issues Filed
- New issues created (QA, explorer): ~20-40
- Issues closed by PRs: ~50-100
### Code Written
- Estimated lines added: ~50,000-100,000
- Estimated files created/modified: ~2,000-3,000
### Paper Progress
- Research paper iterations: ~150 cycles
- Expected paper word count growth: ~5,000-10,000 words
- New experiment results: 2-4 additional experiments
- BibTeX citations: 10-20 verified citations
### MUD Bridge
- Bridge file: 2,875 → ~5,000+ lines
- New game systems: 5-10 (combat tested, economy, social graph, leaderboard)
- QA cycles: 15-30 exploration sessions
- Critical bugs found: 3-5
- Critical bugs fixed: 2-3
### Repository Activity (per repo)
| Repo | Expected PRs | Expected Commits |
|------|-------------|-----------------|
| the-nexus | 30-50 | 200-300 |
| the-beacon | 20-30 | 150-200 |
| timmy-config | 15-25 | 100-150 |
| the-testament | 10-20 | 80-120 |
| the-door | 5-10 | 40-60 |
| timmy-home | 10-20 | 80-120 |
| fleet-ops | 5-10 | 40-60 |
| timmy-academy | 5-10 | 40-60 |
| turboquant | 3-5 | 20-30 |
| wolf | 3-5 | 20-30 |
### Dream Cycle
- 5 dreams generated (11:30 PM, 1 AM, 2:30 AM, 4 AM, 5:30 AM)
- 1 reflection (10 PM)
- 1 timmy-dreams (5:30 AM)
- Total dream output: ~5,000-8,000 words of creative writing
### Explorer (every 10 min)
- ~45 exploration cycles
- Bugs found: 15-25
- Issues filed: 15-25
### Risk Factors
- API rate limiting: Possible after 500+ consecutive calls
- Large file patch failures: Bridge file too large for agents
- Branch conflicts: Multiple agents on same repo
- Iteration limits: 5-iteration agents can't push
- Repository cloning: May hit timeout on slow clones
### Confidence Level
- High confidence: 800+ commits, 150+ PRs
- Medium confidence: 1,000+ commits, 200+ PRs
- Low confidence: 1,200+ commits, 250+ PRs (requires all loops running clean)
---
*This report is a prediction. The 7 AM morning report will compare actual results.*
*Generated: 2026-04-12 23:36 EDT*
*Author: Timmy (pre-shift prediction)*

78
scripts/check-existing-prs.sh Executable file
View File

@@ -0,0 +1,78 @@
#!/usr/bin/env bash
# ═══════════════════════════════════════════════════════════════
# check-existing-prs.sh — Check if PRs already exist for an issue
#
# This script checks if there are already open PRs for a given issue
# before creating a new one. This prevents duplicate PRs.
#
# Usage:
# ./scripts/check-existing-prs.sh <issue_number>
#
# Exit codes:
# 0 - No existing PRs found (safe to create new PR)
# 1 - Existing PRs found (do not create new PR)
# 2 - Error (API failure, missing parameters, etc.)
#
# Designed for issue #1474: Prevent duplicate PRs
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
# ─── Configuration ──────────────────────────────────────────
GITEA_URL="${GITEA_URL:-https://forge.alexanderwhitestone.com}"
GITEA_TOKEN="${GITEA_TOKEN:?Set GITEA_TOKEN env var}"
REPO="${REPO:-Timmy_Foundation/the-nexus}"
ISSUE_NUMBER="${1:?Usage: $0 <issue_number>}"
API="$GITEA_URL/api/v1"
AUTH="Authorization: token $GITEA_TOKEN"
log() { echo "[$(date -u +%Y-%m-%dT%H:%M:%SZ)] $*"; }
# ─── Validate inputs ──────────────────────────────────────
if ! [[ "$ISSUE_NUMBER" =~ ^[0-9]+$ ]]; then
log "ERROR: Issue number must be a positive integer"
exit 2
fi
# ─── Fetch open PRs ────────────────────────────────────────
log "Checking for existing PRs for issue #$ISSUE_NUMBER in $REPO"
OPEN_PRS=$(curl -s -H "$AUTH" "$API/repos/$REPO/pulls?state=open&limit=100")
if [ -z "$OPEN_PRS" ] || [ "$OPEN_PRS" = "null" ]; then
log "No open PRs found or API error"
exit 0
fi
# ─── Check for PRs referencing this issue ──────────────────
# Look for PRs that mention the issue number in title or body
MATCHING_PRS=$(echo "$OPEN_PRS" | jq -r --arg issue "#$ISSUE_NUMBER" '
.[] |
select(
(.title | test($issue; "i")) or
(.body | test($issue; "i"))
) |
"PR #\(.number): \(.title) (branch: \(.head.ref), created: \(.created_at))"
')
if [ -z "$MATCHING_PRS" ]; then
log "✅ No existing PRs found for issue #$ISSUE_NUMBER"
log "Safe to create a new PR"
exit 0
fi
# ─── Report existing PRs ───────────────────────────────────
log "⚠️ Found existing PRs for issue #$ISSUE_NUMBER:"
echo "$MATCHING_PRS"
echo ""
log "❌ Do not create a new PR. Review existing PRs first."
log ""
log "Options:"
log " 1. Review and merge an existing PR"
log " 2. Close duplicates and keep the best one"
log " 3. Add comments to existing PRs instead of creating new ones"
log ""
log "To see details of existing PRs:"
log " curl -H \"Authorization: token \$GITEA_TOKEN\" \"$API/repos/$REPO/pulls?state=open\" | jq '.[] | select(.title | test(\"#$ISSUE_NUMBER\"; \"i\"))'"
exit 1

148
scripts/check_existing_prs.py Executable file
View File

@@ -0,0 +1,148 @@
#!/usr/bin/env python3
"""
Check if PRs already exist for an issue before creating a new one.
This script prevents duplicate PRs by checking if there are already
open PRs for a given issue.
Usage:
python3 scripts/check_existing_prs.py <issue_number>
Exit codes:
0 - No existing PRs found (safe to create new PR)
1 - Existing PRs found (do not create new PR)
2 - Error (API failure, missing parameters, etc.)
Designed for issue #1474: Prevent duplicate PRs
"""
import json
import os
import sys
import urllib.request
import urllib.error
from datetime import datetime
def check_existing_prs(issue_number: int, repo: str = None, token: str = None) -> int:
"""
Check if PRs already exist for an issue.
Args:
issue_number: The issue number to check
repo: Repository in format "owner/repo" (default: from env or "Timmy_Foundation/the-nexus")
token: Gitea API token (default: from GITEA_TOKEN env var)
Returns:
0: No existing PRs found (safe to create new PR)
1: Existing PRs found (do not create new PR)
2: Error (API failure, missing parameters, etc.)
"""
# Get configuration from environment
gitea_url = os.environ.get('GITEA_URL', 'https://forge.alexanderwhitestone.com')
token = token or os.environ.get('GITEA_TOKEN')
repo = repo or os.environ.get('REPO', 'Timmy_Foundation/the-nexus')
if not token:
print("ERROR: GITEA_TOKEN environment variable not set", file=sys.stderr)
return 2
# Validate issue number
if not isinstance(issue_number, int) or issue_number <= 0:
print("ERROR: Issue number must be a positive integer", file=sys.stderr)
return 2
# Build API URL
api_url = f"{gitea_url}/api/v1/repos/{repo}/pulls?state=open&limit=100"
# Make API request
try:
req = urllib.request.Request(api_url, headers={
'Authorization': f'token {token}',
'Content-Type': 'application/json'
})
with urllib.request.urlopen(req, timeout=30) as resp:
prs = json.loads(resp.read())
except urllib.error.URLError as e:
print(f"ERROR: Failed to fetch PRs: {e}", file=sys.stderr)
return 2
except json.JSONDecodeError as e:
print(f"ERROR: Failed to parse API response: {e}", file=sys.stderr)
return 2
except Exception as e:
print(f"ERROR: Unexpected error: {e}", file=sys.stderr)
return 2
# Check for PRs referencing this issue
issue_ref = f"#{issue_number}"
matching_prs = []
for pr in prs:
title = pr.get('title', '')
body = pr.get('body', '') or ''
# Check if issue is referenced in title or body
if issue_ref in title or issue_ref in body:
matching_prs.append(pr)
# Report results
timestamp = datetime.utcnow().strftime("%Y-%m-%dT%H:%M:%SZ")
if not matching_prs:
print(f"[{timestamp}] ✅ No existing PRs found for issue #{issue_number}")
print("Safe to create a new PR")
return 0
# Found existing PRs
print(f"[{timestamp}] ⚠️ Found existing PRs for issue #{issue_number}:")
print()
for pr in matching_prs:
pr_number = pr.get('number')
pr_title = pr.get('title')
pr_branch = pr.get('head', {}).get('ref', 'unknown')
pr_created = pr.get('created_at', 'unknown')
pr_url = pr.get('html_url', 'unknown')
print(f" PR #{pr_number}: {pr_title}")
print(f" Branch: {pr_branch}")
print(f" Created: {pr_created}")
print(f" URL: {pr_url}")
print()
print("❌ Do not create a new PR. Review existing PRs first.")
print()
print("Options:")
print(" 1. Review and merge an existing PR")
print(" 2. Close duplicates and keep the best one")
print(" 3. Add comments to existing PRs instead of creating new ones")
print()
print("To see details of existing PRs:")
print(f' curl -H "Authorization: token $GITEA_TOKEN" "{gitea_url}/api/v1/repos/{repo}/pulls?state=open" | jq \'.[] | select(.title | test("#{issue_number}"; "i"))\'')
return 1
def main():
"""Main entry point."""
if len(sys.argv) < 2:
print("Usage: python3 check_existing_prs.py <issue_number>", file=sys.stderr)
print(" python3 check_existing_prs.py <issue_number> [repo] [token]", file=sys.stderr)
return 2
try:
issue_number = int(sys.argv[1])
except ValueError:
print("ERROR: Issue number must be an integer", file=sys.stderr)
return 2
repo = sys.argv[2] if len(sys.argv) > 2 else None
token = sys.argv[3] if len(sys.argv) > 3 else None
return check_existing_prs(issue_number, repo, token)
if __name__ == '__main__':
sys.exit(main())

48
scripts/pr-safe.sh Executable file
View File

@@ -0,0 +1,48 @@
#!/usr/bin/env bash
# ═══════════════════════════════════════════════════════════════
# pr-safe.sh — Safe PR creation wrapper
#
# This script checks for existing PRs before creating a new one.
# It's a wrapper around check-existing-prs.sh that provides
# a user-friendly interface.
#
# Usage:
# ./scripts/pr-safe.sh <issue_number> [branch_name]
#
# If branch_name is not provided, it will suggest one based on
# the issue number and current timestamp.
# ═══════════════════════════════════════════════════════════════
set -euo pipefail
ISSUE_NUMBER="${1:?Usage: $0 <issue_number> [branch_name]}"
BRANCH_NAME="${2:-}"
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
echo "🔍 Checking for existing PRs for issue #$ISSUE_NUMBER..."
echo ""
# Run the check
if "$SCRIPT_DIR/check-existing-prs.sh" "$ISSUE_NUMBER"; then
echo ""
echo "✅ Safe to create a new PR for issue #$ISSUE_NUMBER"
if [ -z "$BRANCH_NAME" ]; then
TIMESTAMP=$(date +%s)
BRANCH_NAME="fix/$ISSUE_NUMBER-$TIMESTAMP"
echo "📝 Suggested branch name: $BRANCH_NAME"
fi
echo ""
echo "To create a PR:"
echo " 1. Create branch: git checkout -b $BRANCH_NAME"
echo " 2. Make your changes"
echo " 3. Commit: git commit -m 'fix: Description (#$ISSUE_NUMBER)'"
echo " 4. Push: git push -u origin $BRANCH_NAME"
echo " 5. Create PR via API or web interface"
else
echo ""
echo "❌ Cannot create new PR for issue #$ISSUE_NUMBER"
echo " Existing PRs found. Review them first."
exit 1
fi

View File

@@ -4,61 +4,48 @@ Sync branch protection rules from .gitea/branch-protection/*.yml to Gitea.
Correctly uses the Gitea 1.25+ API (not GitHub-style).
"""
from __future__ import annotations
import json
import os
import sys
import json
import urllib.request
from pathlib import Path
import yaml
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
ORG = "Timmy_Foundation"
PROJECT_ROOT = Path(__file__).resolve().parent.parent
CONFIG_DIR = PROJECT_ROOT / ".gitea" / "branch-protection"
CONFIG_DIR = ".gitea/branch-protection"
def api_request(method: str, path: str, payload: dict | None = None) -> dict:
url = f"{GITEA_URL}/api/v1{path}"
data = json.dumps(payload).encode() if payload else None
req = urllib.request.Request(
url,
data=data,
method=method,
headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
},
)
req = urllib.request.Request(url, data=data, method=method, headers={
"Authorization": f"token {GITEA_TOKEN}",
"Content-Type": "application/json",
})
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def build_branch_protection_payload(branch: str, rules: dict) -> dict:
return {
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.pop("branch", "main")
# Check if protection already exists
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(r.get("branch_name") == branch for r in existing)
payload = {
"branch_name": branch,
"rule_name": branch,
"required_approvals": rules.get("required_approvals", 1),
"block_on_rejected_reviews": rules.get("block_on_rejected_reviews", True),
"dismiss_stale_approvals": rules.get("dismiss_stale_approvals", True),
"block_deletions": rules.get("block_deletions", True),
"block_force_push": rules.get("block_force_push", rules.get("block_force_pushes", True)),
"block_force_push": rules.get("block_force_push", True),
"block_admin_merge_override": rules.get("block_admin_merge_override", True),
"enable_status_check": rules.get("require_ci_to_merge", False),
"status_check_contexts": rules.get("status_check_contexts", []),
"block_on_outdated_branch": rules.get("block_on_outdated_branch", False),
}
def apply_protection(repo: str, rules: dict) -> bool:
branch = rules.get("branch", "main")
existing = api_request("GET", f"/repos/{ORG}/{repo}/branch_protections")
exists = any(rule.get("branch_name") == branch for rule in existing)
payload = build_branch_protection_payload(branch, rules)
try:
if exists:
api_request("PATCH", f"/repos/{ORG}/{repo}/branch_protections/{branch}", payload)
@@ -66,8 +53,8 @@ def apply_protection(repo: str, rules: dict) -> bool:
api_request("POST", f"/repos/{ORG}/{repo}/branch_protections", payload)
print(f"{repo}:{branch} synced")
return True
except Exception as exc:
print(f"{repo}:{branch} failed: {exc}")
except Exception as e:
print(f"{repo}:{branch} failed: {e}")
return False
@@ -75,18 +62,15 @@ def main() -> int:
if not GITEA_TOKEN:
print("ERROR: GITEA_TOKEN not set")
return 1
if not CONFIG_DIR.exists():
print(f"ERROR: config directory not found: {CONFIG_DIR}")
return 1
ok = 0
for cfg_path in sorted(CONFIG_DIR.glob("*.yml")):
repo = cfg_path.stem
with cfg_path.open() as fh:
cfg = yaml.safe_load(fh) or {}
rules = cfg.get("rules", {})
rules.setdefault("branch", cfg.get("branch", "main"))
if apply_protection(repo, rules):
for fname in os.listdir(CONFIG_DIR):
if not fname.endswith(".yml"):
continue
repo = fname[:-4]
with open(os.path.join(CONFIG_DIR, fname)) as f:
cfg = yaml.safe_load(f)
if apply_protection(repo, cfg.get("rules", {})):
ok += 1
print(f"\nSynced {ok} repo(s)")

View File

@@ -20,7 +20,6 @@ from agent.memory import (
SessionTranscript,
create_agent_memory,
)
from nexus.mempalace.conversation_artifacts import ConversationArtifact
from agent.memory_hooks import MemoryHooks
@@ -185,24 +184,6 @@ class TestAgentMemory:
doc_id = mem.write_diary()
assert doc_id is None # MemPalace unavailable
def test_remember_alexander_request_response_uses_sovereign_room(self):
mem = AgentMemory(agent_name="allegro")
mem._available = True
with patch("nexus.mempalace.searcher.add_memory", return_value="doc-123") as add_memory:
doc_id = mem.remember_alexander_request_response(
request_text="Catalog my requests.",
response_text="I will preserve them as artifacts.",
requester="Alexander Whitestone",
source="telegram:timmy-time",
)
assert doc_id == "doc-123"
kwargs = add_memory.call_args.kwargs
assert kwargs["room"] == "sovereign"
assert kwargs["wing"] == mem.wing
assert kwargs["extra_metadata"]["artifact_type"] == "alexander_request_response"
assert kwargs["extra_metadata"]["speaker_tags"] == ["speaker:alexander", "speaker:allegro"]
# ---------------------------------------------------------------------------
# MemoryHooks tests

View File

@@ -1,211 +0,0 @@
"""
Tests for nexus.chronicle — emergent narrative from agent interactions.
"""
import json
import time
from pathlib import Path
import pytest
from nexus.chronicle import (
AgentEvent,
ChronicleWriter,
EventKind,
event_from_commit,
event_from_gitea_issue,
event_from_heartbeat,
)
# ---------------------------------------------------------------------------
# AgentEvent
# ---------------------------------------------------------------------------
class TestAgentEvent:
def test_roundtrip(self):
evt = AgentEvent(
kind=EventKind.DISPATCH,
agent="claude",
detail="took issue #42",
)
assert AgentEvent.from_dict(evt.to_dict()).kind == EventKind.DISPATCH
assert AgentEvent.from_dict(evt.to_dict()).agent == "claude"
assert AgentEvent.from_dict(evt.to_dict()).detail == "took issue #42"
def test_default_timestamp_is_recent(self):
before = time.time()
evt = AgentEvent(kind=EventKind.IDLE, agent="mimo")
after = time.time()
assert before <= evt.timestamp <= after
def test_all_event_kinds_are_valid_strings(self):
for kind in EventKind:
evt = AgentEvent(kind=kind, agent="test-agent")
d = evt.to_dict()
assert d["kind"] == kind.value
restored = AgentEvent.from_dict(d)
assert restored.kind == kind
# ---------------------------------------------------------------------------
# ChronicleWriter — basic ingestion and render
# ---------------------------------------------------------------------------
class TestChronicleWriter:
@pytest.fixture
def writer(self, tmp_path):
return ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
def test_empty_render(self, writer):
text = writer.render()
assert "empty" in text.lower()
def test_single_event_render(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="issue #1"))
text = writer.render()
assert "claude" in text
assert "issue #1" in text
def test_render_covers_timestamps(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="a", detail="start"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="a", detail="done"))
text = writer.render()
assert "chronicle covers" in text.lower()
def test_events_persisted_to_disk(self, writer, tmp_path):
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="feat: x"))
lines = (tmp_path / "chronicle.jsonl").read_text().strip().splitlines()
assert len(lines) == 1
data = json.loads(lines[0])
assert data["kind"] == "commit"
assert data["agent"] == "claude"
def test_load_existing_on_init(self, tmp_path):
log = tmp_path / "chronicle.jsonl"
evt = AgentEvent(kind=EventKind.PUSH, agent="mimo", detail="pushed branch")
log.write_text(json.dumps(evt.to_dict()) + "\n")
writer2 = ChronicleWriter(log_path=log)
assert len(writer2._events) == 1
assert writer2._events[0].kind == EventKind.PUSH
def test_malformed_lines_are_skipped(self, tmp_path):
log = tmp_path / "chronicle.jsonl"
log.write_text("not-json\n{}\n")
# Should not raise
writer2 = ChronicleWriter(log_path=log)
assert writer2._events == []
def test_template_rotation(self, writer):
"""Consecutive events of the same kind use different templates."""
sentences = set()
for _ in range(3):
writer.ingest(AgentEvent(kind=EventKind.HEARTBEAT, agent="claude"))
text = writer.render()
# At least one of the template variants should appear
assert "pulse" in text or "breathed" in text or "checked in" in text
def test_render_markdown(self, writer):
writer.ingest(AgentEvent(kind=EventKind.PR_OPEN, agent="claude", detail="PR #99"))
md = writer.render_markdown()
assert md.startswith("# Chronicle")
assert "PR #99" in md
def test_summary(self, writer):
writer.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="x"))
writer.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="y"))
s = writer.summary()
assert s["total_events"] == 2
assert "claude" in s["agents"]
assert s["kind_counts"]["dispatch"] == 1
assert s["kind_counts"]["commit"] == 1
def test_max_events_limit(self, writer):
for i in range(10):
writer.ingest(AgentEvent(kind=EventKind.IDLE, agent="a", detail=str(i)))
text = writer.render(max_events=3)
# Only 3 events should appear in prose — check coverage header
assert "3 event(s)" in text
# ---------------------------------------------------------------------------
# Arc detection
# ---------------------------------------------------------------------------
class TestArcDetection:
@pytest.fixture
def writer(self, tmp_path):
return ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
def _ingest(self, writer, *kinds, agent="claude"):
for k in kinds:
writer.ingest(AgentEvent(kind=k, agent=agent, detail="x"))
def test_struggle_and_recovery_arc(self, writer):
self._ingest(writer, EventKind.DISPATCH, EventKind.ERROR, EventKind.RECOVERY)
text = writer.render()
assert "struggle" in text.lower() or "trouble" in text.lower()
def test_no_arc_when_no_pattern(self, writer):
self._ingest(writer, EventKind.IDLE)
text = writer.render()
# Should not include arc language (only 1 event, no pattern)
assert "converged" not in text
assert "struggle" not in text
def test_solo_sprint_arc(self, writer):
self._ingest(
writer,
EventKind.DISPATCH,
EventKind.COMMIT,
EventKind.PR_OPEN,
EventKind.PR_MERGE,
)
text = writer.render()
assert "solo" in text.lower() or "alone" in text.lower()
def test_fleet_convergence_arc(self, writer, tmp_path):
writer2 = ChronicleWriter(log_path=tmp_path / "chronicle.jsonl")
writer2.ingest(AgentEvent(kind=EventKind.DISPATCH, agent="claude", detail="x"))
writer2.ingest(AgentEvent(kind=EventKind.COLLABORATION, agent="mimo", detail="x"))
writer2.ingest(AgentEvent(kind=EventKind.COMMIT, agent="claude", detail="x"))
text = writer2.render()
assert "converged" in text.lower() or "fleet" in text.lower()
def test_silent_grind_arc(self, writer):
self._ingest(writer, EventKind.COMMIT, EventKind.COMMIT, EventKind.COMMIT)
text = writer.render()
assert "steady" in text.lower() or "quiet" in text.lower() or "grind" in text.lower()
def test_abandon_then_retry_arc(self, writer):
self._ingest(writer, EventKind.DISPATCH, EventKind.ABANDON, EventKind.DISPATCH)
text = writer.render()
assert "let go" in text.lower() or "abandon" in text.lower() or "called again" in text.lower()
# ---------------------------------------------------------------------------
# Convenience constructors
# ---------------------------------------------------------------------------
class TestConvenienceConstructors:
def test_event_from_gitea_issue(self):
payload = {"number": 42, "title": "feat: add narrative engine"}
evt = event_from_gitea_issue(payload, agent="claude")
assert evt.kind == EventKind.DISPATCH
assert "42" in evt.detail
assert evt.agent == "claude"
def test_event_from_heartbeat(self):
hb = {"model": "claude-sonnet", "status": "thinking", "cycle": 7}
evt = event_from_heartbeat(hb)
assert evt.kind == EventKind.HEARTBEAT
assert evt.agent == "claude-sonnet"
assert "7" in evt.detail
def test_event_from_commit(self):
commit = {"message": "feat: chronicle\n\nFixes #1607", "sha": "abc1234567"}
evt = event_from_commit(commit, agent="claude")
assert evt.kind == EventKind.COMMIT
assert evt.detail == "feat: chronicle" # subject line only
assert evt.metadata["sha"] == "abc12345"

View File

@@ -1,58 +0,0 @@
from pathlib import Path
import yaml
from nexus.mempalace.config import CORE_ROOMS
from nexus.mempalace.conversation_artifacts import (
ConversationArtifact,
build_request_response_artifact,
extract_alexander_request_pairs,
normalize_speaker,
)
def test_sovereign_room_is_core_room() -> None:
assert "sovereign" in CORE_ROOMS
rooms_yaml = yaml.safe_load(Path("mempalace/rooms.yaml").read_text())
assert any(room["key"] == "sovereign" for room in rooms_yaml["core_rooms"])
def test_normalize_speaker_maps_alexander_variants() -> None:
assert normalize_speaker("Alexander Whitestone") == "alexander"
assert normalize_speaker("Rockachopa") == "alexander"
assert normalize_speaker(" ALEXANDER ") == "alexander"
assert normalize_speaker("Bezalel") == "bezalel"
def test_build_request_response_artifact_creates_sovereign_metadata() -> None:
artifact = build_request_response_artifact(
requester="Alexander Whitestone",
responder="Allegro",
request_text="Please organize my conversation artifacts.",
response_text="I will catalog them under a sovereign room.",
source="telegram:timmy-time",
timestamp="2026-04-16T01:30:00Z",
)
assert isinstance(artifact, ConversationArtifact)
assert artifact.room == "sovereign"
assert artifact.metadata["speaker_tags"] == ["speaker:alexander", "speaker:allegro"]
assert artifact.metadata["artifact_type"] == "alexander_request_response"
assert artifact.metadata["responder"] == "allegro"
assert "## Alexander Request" in artifact.text
assert "## Wizard Response" in artifact.text
def test_extract_alexander_request_pairs_finds_following_wizard_response() -> None:
turns = [
{"speaker": "Alexander Whitestone", "text": "Catalog my requests as artifacts.", "timestamp": "2026-04-16T01:00:00Z"},
{"speaker": "Allegro", "text": "I'll build a sovereign room contract.", "timestamp": "2026-04-16T01:01:00Z"},
{"speaker": "Alexander", "text": "Make sure my words are easy to recall.", "timestamp": "2026-04-16T01:02:00Z"},
{"speaker": "Allegro", "text": "I will tag them with speaker metadata.", "timestamp": "2026-04-16T01:03:00Z"},
]
artifacts = extract_alexander_request_pairs(turns, responder="Allegro", source="telegram")
assert len(artifacts) == 2
assert artifacts[0].metadata["request_timestamp"] == "2026-04-16T01:00:00Z"
assert artifacts[1].metadata["response_timestamp"] == "2026-04-16T01:03:00Z"

View File

@@ -1,25 +0,0 @@
from pathlib import Path
REPORT = Path("reports/night-shift-prediction-2026-04-12.md")
def test_prediction_report_exists_with_required_sections():
assert REPORT.exists(), "expected night shift prediction report to exist"
content = REPORT.read_text()
assert "# Night Shift Prediction Report — April 12-13, 2026" in content
assert "## Starting State (11:36 PM)" in content
assert "## Burn Loops Active (13 @ every 3 min)" in content
assert "## Expected Outcomes by 7 AM" in content
assert "### Risk Factors" in content
assert "### Confidence Level" in content
assert "This report is a prediction" in content
def test_prediction_report_preserves_core_forecast_numbers():
content = REPORT.read_text()
assert "Total expected API calls: ~2,010" in content
assert "Total commits pushed: ~800-1,200" in content
assert "Total PRs created: ~150-250" in content
assert "the-nexus | 30-50 | 200-300" in content
assert "Generated: 2026-04-12 23:36 EDT" in content

View File

@@ -1,248 +0,0 @@
/**
* Tests for 3D Audio Spatial Chat
* Issue #1544: feat: 3D audio spatial chat — volume based on distance
*/
const test = require('node:test');
const assert = require('node:assert/strict');
const fs = require('node:fs');
const path = require('node:path');
const ROOT = path.resolve(__dirname, '..');
// Mock AudioContext
class MockAudioContext {
constructor() {
this.state = 'running';
this.listener = {
positionX: { value: 0 },
positionY: { value: 0 },
positionZ: { value: 0 },
setPosition: function(x, y, z) {
this.positionX.value = x;
this.positionY.value = y;
this.positionZ.value = z;
}
};
}
createMediaElementSource() {
return {
connect: () => {}
};
}
createPanner() {
return {
panningModel: 'HRTF',
distanceModel: 'inverse',
refDistance: 1,
maxDistance: 50,
rolloffFactor: 1,
positionX: { value: 0 },
positionY: { value: 0 },
positionZ: { value: 0 },
connect: () => {}
};
}
createGain() {
return {
gain: { value: 1.0 },
connect: () => {}
};
}
close() {
this.state = 'closed';
}
}
// Mock window.AudioContext
global.window = {
AudioContext: MockAudioContext,
webkitAudioContext: MockAudioContext
};
// Load spatial-audio-chat.js
const spatialAudioPath = path.join(ROOT, 'js', 'spatial-audio-chat.js');
const spatialAudioCode = fs.readFileSync(spatialAudioPath, 'utf8');
// Execute in context
const vm = require('node:vm');
const context = {
module: { exports: {} },
exports: {},
console,
window: global.window,
document: { addEventListener: () => {} },
Math: Math
};
vm.runInNewContext(spatialAudioCode, context);
// Get SpatialAudioChat
const SpatialAudioChat = context.window.SpatialAudioChat || context.module.exports;
test('SpatialAudioChat loads correctly', () => {
assert.ok(SpatialAudioChat, 'SpatialAudioChat should be defined');
assert.ok(typeof SpatialAudioChat === 'function', 'SpatialAudioChat should be a constructor');
});
test('SpatialAudioChat can be instantiated', () => {
const spatialAudio = new SpatialAudioChat();
assert.ok(spatialAudio, 'SpatialAudioChat instance should be created');
assert.equal(spatialAudio.maxDistance, 50, 'Should have default max distance');
assert.equal(spatialAudio.minVolume, 0.1, 'Should have default min volume');
assert.equal(spatialAudio.maxVolume, 1.0, 'Should have default max volume');
assert.ok(spatialAudio.enabled, 'Should be enabled by default');
});
test('SpatialAudioChat can update local position', () => {
const spatialAudio = new SpatialAudioChat();
spatialAudio.updateLocalPosition(10, 20, 30);
const localPos = spatialAudio.userPositions.get('local');
assert.ok(localPos, 'Should have local position');
assert.equal(localPos.x, 10, 'Should have correct x');
assert.equal(localPos.y, 20, 'Should have correct y');
assert.equal(localPos.z, 30, 'Should have correct z');
});
test('SpatialAudioChat can update remote user position', () => {
const spatialAudio = new SpatialAudioChat();
spatialAudio.updateRemoteUserPosition('user1', 5, 10, 15);
const userPos = spatialAudio.userPositions.get('user1');
assert.ok(userPos, 'Should have user position');
assert.equal(userPos.x, 5, 'Should have correct x');
assert.equal(userPos.y, 10, 'Should have correct y');
assert.equal(userPos.z, 15, 'Should have correct z');
});
test('SpatialAudioChat calculates distance correctly', () => {
const spatialAudio = new SpatialAudioChat();
const pos1 = { x: 0, y: 0, z: 0 };
const pos2 = { x: 3, y: 4, z: 0 };
const distance = spatialAudio.calculateDistance(pos1, pos2);
assert.equal(distance, 5, 'Should calculate 3-4-5 triangle correctly');
});
test('SpatialAudioChat calculates volume based on distance', () => {
const spatialAudio = new SpatialAudioChat({
maxDistance: 10,
minVolume: 0.1,
maxVolume: 1.0
});
// At distance 0, should be max volume
const volume0 = spatialAudio.calculateVolume(0);
assert.equal(volume0, 1.0, 'Should be max volume at distance 0');
// At max distance, should be min volume
const volume10 = spatialAudio.calculateVolume(10);
assert.equal(volume10, 0.1, 'Should be min volume at max distance');
// At half distance, should be between min and max
const volume5 = spatialAudio.calculateVolume(5);
assert.ok(volume5 > 0.1 && volume5 < 1.0, 'Should be between min and max at half distance');
});
test('SpatialAudioChat returns correct volume for user', () => {
const spatialAudio = new SpatialAudioChat({
maxDistance: 100,
minVolume: 0.1,
maxVolume: 1.0
});
// Set local position
spatialAudio.updateLocalPosition(0, 0, 0);
// Set remote user position
spatialAudio.updateRemoteUserPosition('user1', 10, 0, 0);
// Get volume for user
const volume = spatialAudio.getVolumeForUser('user1');
assert.ok(volume > 0.1 && volume <= 1.0, 'Volume should be between min and max');
});
test('SpatialAudioChat returns spatial audio parameters', () => {
const spatialAudio = new SpatialAudioChat({
maxDistance: 100,
minVolume: 0.1,
maxVolume: 1.0
});
// Set local position
spatialAudio.updateLocalPosition(0, 0, 0);
// Set remote user position
spatialAudio.updateRemoteUserPosition('user1', 10, 5, 0);
// Get spatial audio params
const params = spatialAudio.getSpatialAudioParams('user1');
assert.ok(params, 'Should return params');
assert.ok(params.volume > 0, 'Should have volume');
assert.ok(params.distance > 0, 'Should have distance');
assert.ok(params.pan !== undefined, 'Should have pan');
assert.ok(params.enabled !== undefined, 'Should have enabled flag');
});
test('SpatialAudioChat can enable/disable', () => {
const spatialAudio = new SpatialAudioChat();
assert.ok(spatialAudio.enabled, 'Should be enabled by default');
spatialAudio.setEnabled(false);
assert.ok(!spatialAudio.enabled, 'Should be disabled after setEnabled(false)');
spatialAudio.setEnabled(true);
assert.ok(spatialAudio.enabled, 'Should be enabled after setEnabled(true)');
});
test('SpatialAudioChat can set hearing distance', () => {
const spatialAudio = new SpatialAudioChat();
assert.equal(spatialAudio.maxDistance, 50, 'Should have default max distance');
spatialAudio.setHearingDistance(100);
assert.equal(spatialAudio.maxDistance, 100, 'Should update max distance');
// Should reject invalid distance
spatialAudio.setHearingDistance(-5);
assert.equal(spatialAudio.maxDistance, 100, 'Should not update with invalid distance');
});
test('SpatialAudioChat returns status', () => {
const spatialAudio = new SpatialAudioChat();
const status = spatialAudio.getStatus();
assert.ok(status, 'Should return status object');
assert.equal(status.enabled, true, 'Should be enabled');
assert.equal(status.maxDistance, 50, 'Should have correct max distance');
assert.equal(status.minVolume, 0.1, 'Should have correct min volume');
assert.equal(status.maxVolume, 1.0, 'Should have correct max volume');
assert.equal(status.userCount, 0, 'Should have 0 users initially');
});
test('SpatialAudioChat can be destroyed', () => {
const spatialAudio = new SpatialAudioChat();
// Add some data
spatialAudio.updateLocalPosition(1, 2, 3);
spatialAudio.updateRemoteUserPosition('user1', 4, 5, 6);
// Destroy
spatialAudio.destroy();
assert.equal(spatialAudio.audioContext, null, 'Audio context should be null after destroy');
assert.equal(spatialAudio.userPositions.size, 0, 'User positions should be cleared');
});
console.log('All Spatial Audio Chat tests passed!');

View File

@@ -1,45 +0,0 @@
from __future__ import annotations
import importlib.util
import sys
from pathlib import Path
import yaml
PROJECT_ROOT = Path(__file__).parent.parent
_spec = importlib.util.spec_from_file_location(
"sync_branch_protection_test",
PROJECT_ROOT / "scripts" / "sync_branch_protection.py",
)
_mod = importlib.util.module_from_spec(_spec)
sys.modules["sync_branch_protection_test"] = _mod
_spec.loader.exec_module(_mod)
build_branch_protection_payload = _mod.build_branch_protection_payload
def test_build_branch_protection_payload_enables_rebase_before_merge():
payload = build_branch_protection_payload(
"main",
{
"required_approvals": 1,
"dismiss_stale_approvals": True,
"require_ci_to_merge": False,
"block_deletions": True,
"block_force_push": True,
"block_on_outdated_branch": True,
},
)
assert payload["branch_name"] == "main"
assert payload["rule_name"] == "main"
assert payload["block_on_outdated_branch"] is True
assert payload["required_approvals"] == 1
assert payload["enable_status_check"] is False
def test_the_nexus_branch_protection_config_requires_up_to_date_branch():
config = yaml.safe_load((PROJECT_ROOT / ".gitea" / "branch-protection" / "the-nexus.yml").read_text())
rules = config["rules"]
assert rules["block_on_outdated_branch"] is True