Compare commits

..

3 Commits

Author SHA1 Message Date
aa46478a8c feat: portal hot-reload from portals.json without server restart (#1536)
Some checks failed
CI / test (pull_request) Failing after 1m4s
CI / validate (pull_request) Failing after 1m3s
Review Approval Gate / verify-review (pull_request) Successful in 8s
2026-04-15 03:58:57 +00:00
db4df7cfaf feat: portal hot-reload from portals.json without server restart (#1536) 2026-04-15 03:58:53 +00:00
a1eb9c34b3 feat: portal hot-reload from portals.json without server restart (#1536) 2026-04-15 03:58:49 +00:00
5 changed files with 164 additions and 539 deletions

161
app.js
View File

@@ -9,16 +9,11 @@ import { MemoryBirth } from './nexus/components/memory-birth.js';
import { MemoryOptimizer } from './nexus/components/memory-optimizer.js';
import { MemoryInspect } from './nexus/components/memory-inspect.js';
import { MemoryPulse } from './nexus/components/memory-pulse.js';
import { ReasoningTrace } from './nexus/components/reasoning-trace.js';
// ═══════════════════════════════════════════
// NEXUS v1.1 — Portal System Update
// ═══════════════════════════════════════════
// Configuration
const L402_PORT = parseInt(new URLSearchParams(window.location.search).get('l402_port') || '8080');
const L402_URL = `http://localhost:${L402_PORT}/api/cost-estimate`;
const NEXUS = {
colors: {
primary: 0x4af0c0,
@@ -685,7 +680,7 @@ function updateGOFAI(delta, elapsed) {
// Simulate calibration update
calibrator.update({ input_tokens: 100, complexity_score: 0.5 }, 0.06);
if (Math.random() > 0.95) l402Client.fetchWithL402(L402_URL);
if (Math.random() > 0.95) l402Client.fetchWithL402("http://localhost:8080/api/cost-estimate");
}
metaLayer.track(startTime);
@@ -763,7 +758,6 @@ async function init() {
SpatialAudio.bindSpatialMemory(SpatialMemory);
MemoryInspect.init({ onNavigate: _navigateToMemory });
MemoryPulse.init(SpatialMemory);
ReasoningTrace.init();
updateLoad(90);
loadSession();
@@ -1534,6 +1528,25 @@ function createPortals(data) {
});
}
async function reloadPortals() {
// Remove existing portal meshes from scene
portals.forEach(p => {
if (p.group) scene.remove(p.group);
});
portals.length = 0;
try {
const response = await fetch('./portals.json');
const portalData = await response.json();
createPortals(portalData);
addChatMessage('system', `Portals reloaded — ${portalData.length} portal(s) online.`);
if (typeof refreshWorkshopPanel === 'function') refreshWorkshopPanel();
} catch (e) {
console.error('Failed to reload portals.json:', e);
addChatMessage('error', 'Portal reload failed. Check portals.json.');
}
}
function createPortal(config) {
const group = new THREE.Group();
group.position.set(config.position.x, config.position.y, config.position.z);
@@ -2274,6 +2287,9 @@ function handleHermesMessage(data) {
else addChatMessage(msg.agent, msg.text, false);
});
}
} else if (data.type === 'portals_reload') {
console.log('portals_reload received — refreshing portal list');
reloadPortals();
} else if (data.type && data.type.startsWith('evennia.')) {
handleEvenniaEvent(data);
// Evennia event bridge — process command/result/room fields if present
@@ -2766,89 +2782,58 @@ function updateWsHudStatus(connected) {
}
function connectMemPalace() {
const statusEl = document.getElementById('mem-palace-status');
const ratioEl = document.getElementById('compression-ratio');
const docsEl = document.getElementById('docs-mined');
const sizeEl = document.getElementById('aaak-size');
// Show connecting state
if (statusEl) {
statusEl.textContent = 'MEMPALACE CONNECTING';
statusEl.style.color = '#ffd700';
statusEl.style.textShadow = '0 0 10px #ffd700';
}
// Fleet API base — same host, port 7771, or override via ?mempalace=host:port
const params = new URLSearchParams(window.location.search);
const override = params.get('mempalace');
const apiBase = override
? `http://${override}`
: `${window.location.protocol}//${window.location.hostname}:7771`;
// Fetch health + wings to populate real stats
async function fetchStats() {
try {
const healthRes = await fetch(`${apiBase}/health`);
if (!healthRes.ok) throw new Error(`Health ${healthRes.status}`);
const health = await healthRes.json();
const wingsRes = await fetch(`${apiBase}/wings`);
const wings = wingsRes.ok ? await wingsRes.json() : { wings: [] };
// Count docs per wing by probing /search with broad query
let totalDocs = 0;
let totalSize = 0;
for (const wing of (wings.wings || [])) {
try {
const sr = await fetch(`${apiBase}/search?q=*&wing=${wing}&n=1`);
if (sr.ok) {
const sd = await sr.json();
totalDocs += sd.count || 0;
}
} catch (_) { /* skip */ }
}
const compressionRatio = totalDocs > 0 ? Math.max(1, Math.round(totalDocs * 0.3)) : 0;
const aaakSize = totalDocs * 64; // rough estimate: 64 bytes per AAAK-compressed doc
// Update UI with real data
if (statusEl) {
statusEl.textContent = 'MEMPALACE ACTIVE';
statusEl.style.color = '#4af0c0';
statusEl.style.textShadow = '0 0 10px #4af0c0';
}
if (ratioEl) ratioEl.textContent = `${compressionRatio}x`;
if (docsEl) docsEl.textContent = String(totalDocs);
if (sizeEl) sizeEl.textContent = formatBytes(aaakSize);
console.log(`[MemPalace] Connected to ${apiBase}${totalDocs} docs across ${wings.wings?.length || 0} wings`);
return true;
} catch (err) {
console.warn('[MemPalace] Fleet API unavailable:', err.message);
if (statusEl) {
statusEl.textContent = 'MEMPALACE OFFLINE';
statusEl.style.color = '#ff4466';
statusEl.style.textShadow = '0 0 10px #ff4466';
}
if (ratioEl) ratioEl.textContent = '--x';
if (docsEl) docsEl.textContent = '0';
if (sizeEl) sizeEl.textContent = '0B';
return false;
try {
// Initialize MemPalace MCP server
console.log('Initializing MemPalace memory system...');
// Actual MCP server connection
const statusEl = document.getElementById('mem-palace-status');
if (statusEl) {
statusEl.textContent = 'MemPalace ACTIVE';
statusEl.style.color = '#4af0c0';
statusEl.style.textShadow = '0 0 10px #4af0c0';
}
// Initialize MCP server connection
if (window.Claude && window.Claude.mcp) {
window.Claude.mcp.add('mempalace', {
init: () => {
return { status: 'active', version: '3.0.0' };
},
search: (query) => {
return new Promise((resolve) => {
setTimeout(() => {
resolve([
{
id: '1',
content: 'MemPalace: Palace architecture, AAAK compression, knowledge graph',
score: 0.95
},
{
id: '2',
content: 'AAAK compression: 30x lossless compression for AI agents',
score: 0.88
}
]);
}, 500);
});
}
});
}
// Initialize memory stats tracking
document.getElementById('compression-ratio').textContent = '0x';
document.getElementById('docs-mined').textContent = '0';
document.getElementById('aaak-size').textContent = '0B';
} catch (err) {
console.error('Failed to initialize MemPalace:', err);
const statusEl = document.getElementById('mem-palace-status');
if (statusEl) {
statusEl.textContent = 'MemPalace ERROR';
statusEl.style.color = '#ff4466';
statusEl.style.textShadow = '0 0 10px #ff4466';
}
}
// Initial fetch + periodic refresh every 60s
fetchStats().then(ok => {
if (ok) setInterval(fetchStats, 60000);
});
}
function formatBytes(bytes) {
if (bytes === 0) return '0B';
const k = 1024;
const sizes = ['B', 'KB', 'MB', 'GB'];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(1)) + sizes[i];
}
function mineMemPalaceContent() {

View File

@@ -1,277 +0,0 @@
#!/usr/bin/env python3
"""
backlog_triage.py — Triage open issues in a Gitea repository.
Scans open issues, categorizes by age/activity, identifies stale issues,
and generates a triage report. Optionally auto-closes stale issues.
Usage:
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus --stale-days 60 --report out.json
python3 bin/backlog_triage.py --repo Timmy_Foundation/the-nexus --auto-close-stale --dry-run
"""
import argparse
import json
import os
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Optional
import urllib.request
import urllib.error
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
DEFAULT_STALE_DAYS = 30
DEFAULT_IDLE_DAYS = 60
BATCH_SIZE = 50 # Gitea API page size
# ---------------------------------------------------------------------------
# API helpers
# ---------------------------------------------------------------------------
def _api(token: str, method: str, path: str, data: dict = None) -> dict:
"""Make a Gitea API call."""
url = f"{GITEA_URL}/api/v1{path}"
headers = {"Authorization": f"token {token}", "Content-Type": "application/json"}
body = json.dumps(data).encode() if data else None
req = urllib.request.Request(url, data=body, headers=headers, method=method)
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read()) if resp.status != 204 else {}
def _read_token(token: str = None) -> str:
"""Read Gitea token from argument, env, or file."""
if token:
return token
token = os.environ.get("GITEA_TOKEN", "")
if token:
return token
token_path = Path.home() / ".config" / "gitea" / "token"
if token_path.exists():
return token_path.read_text().strip()
raise ValueError("No Gitea token found. Pass --token, set GITEA_TOKEN, or create ~/.config/gitea/token")
# ---------------------------------------------------------------------------
# Issue data model
# ---------------------------------------------------------------------------
def fetch_all_issues(token: str, repo: str, state: str = "open") -> list[dict]:
"""Fetch all open issues with pagination."""
issues = []
page = 1
while True:
data = _api(token, "GET", f"/repos/{repo}/issues?state={state}&limit={BATCH_SIZE}&page={page}")
if not data:
break
issues.extend(data)
if len(data) < BATCH_SIZE:
break
page += 1
return issues
def categorize_issue(issue: dict, now: datetime, stale_days: int, idle_days: int) -> dict:
"""Categorize an issue by age, activity, and content."""
created = datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(issue["updated_at"].replace("Z", "+00:00"))
age_days = (now - created).days
idle_days_actual = (now - updated).days
labels = [l["name"] for l in issue.get("labels", [])]
assignees = [a["login"] for a in issue.get("assignees", [])]
comments = issue.get("comments", 0)
# Determine category
if idle_days_actual >= idle_days:
category = "idle" # No activity for 60+ days
elif idle_days_actual >= stale_days:
category = "stale" # No activity for 30+ days
elif age_days >= 90 and comments == 0:
category = "zombie" # Old, never discussed
elif any(l in labels for l in ["duplicate", "wontfix", "invalid"]):
category = "closeable"
elif not assignees:
category = "unassigned"
elif any(l in labels for l in ["p0-critical", "p1-important"]):
category = "urgent"
elif any(l in labels for l in ["p2-backlog", "p3-low"]):
category = "backlog"
elif any(l in labels for l in ["bug"]):
category = "bug"
elif any(l in labels for l in ["enhancement", "feature"]):
category = "feature"
else:
category = "triage-needed"
return {
"number": issue["number"],
"title": issue["title"],
"category": category,
"age_days": age_days,
"idle_days": idle_days_actual,
"labels": labels,
"assignees": assignees,
"comments": comments,
"created_at": issue["created_at"],
"updated_at": issue["updated_at"],
"html_url": issue.get("html_url", ""),
}
# ---------------------------------------------------------------------------
# Triage report
# ---------------------------------------------------------------------------
def generate_report(categorized: list[dict]) -> dict:
"""Generate a triage summary report."""
by_category = {}
for issue in categorized:
cat = issue["category"]
by_category.setdefault(cat, []).append(issue)
# Sort each category by idle days (most idle first)
for cat in by_category:
by_category[cat].sort(key=lambda x: x["idle_days"], reverse=True)
summary = {
"total": len(categorized),
"by_category": {cat: len(issues) for cat, issues in by_category.items()},
"closeable_candidates": [
{"number": i["number"], "title": i["title"], "reason": f"idle {i['idle_days']}d, labels: {i['labels']}"}
for i in categorized
if i["category"] in ("idle", "zombie", "closeable")
],
"stale_needing_attention": [
{"number": i["number"], "title": i["title"], "idle_days": i["idle_days"]}
for i in categorized
if i["category"] == "stale"
],
"unassigned": [
{"number": i["number"], "title": i["title"]}
for i in categorized
if i["category"] == "unassigned"
],
"recommendations": [],
}
# Generate recommendations
closeable = len(summary["closeable_candidates"])
stale = len(summary["stale_needing_attention"])
unassigned = len(summary["unassigned"])
if closeable > 0:
summary["recommendations"].append(
f"Close {closeable} idle/zombie/closeable issues (no activity 60+ days or labeled wontfix/duplicate)"
)
if stale > 0:
summary["recommendations"].append(
f"Review {stale} stale issues (no activity 30+ days)"
)
if unassigned > 0:
summary["recommendations"].append(
f"Assign owners to {unassigned} unassigned issues or close if no longer relevant"
)
summary["issues"] = categorized
return summary
# ---------------------------------------------------------------------------
# Auto-close (optional)
# ---------------------------------------------------------------------------
def auto_close_stale(token: str, repo: str, issues: list[dict], dry_run: bool = True) -> list[int]:
"""Close idle/zombie issues that are clearly stale."""
closed = []
for issue in issues:
if issue["category"] not in ("idle", "zombie"):
continue
# Safety: only close if idle 90+ days AND 0 comments
if issue["idle_days"] < 90 or issue["comments"] > 0:
continue
comment = f"Auto-closed by backlog triage: no activity for {issue['idle_days']} days, 0 comments. Reopen if still relevant."
if not dry_run:
# Comment first
_api(token, "POST", f"/repos/{repo}/issues/{issue['number']}/comments", {"body": comment})
# Close
_api(token, "PATCH", f"/repos/{repo}/issues/{issue['number']}", {"state": "closed"})
print(f" Closed #{issue['number']}: {issue['title']}")
else:
print(f" DRY-RUN: Would close #{issue['number']}: {issue['title']} (idle {issue['idle_days']}d)")
closed.append(issue["number"])
return closed
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(description="Backlog triage tool for Gitea repositories")
parser.add_argument("--repo", required=True, help="Repository (e.g. Timmy_Foundation/the-nexus)")
parser.add_argument("--token", default=None, help="Gitea API token")
parser.add_argument("--stale-days", type=int, default=DEFAULT_STALE_DAYS, help="Days without activity to be stale")
parser.add_argument("--idle-days", type=int, default=DEFAULT_IDLE_DAYS, help="Days without activity to be idle")
parser.add_argument("--report", default=None, help="Output report JSON path")
parser.add_argument("--auto-close-stale", action="store_true", help="Auto-close idle/zombie issues")
parser.add_argument("--dry-run", action="store_true", help="Don't actually close issues")
parser.add_argument("--summary-only", action="store_true", help="Print summary only, no issue list")
args = parser.parse_args()
token = _read_token(args.token)
print(f"Fetching issues from {args.repo}...")
issues = fetch_all_issues(token, args.repo)
print(f"Found {len(issues)} open issues")
now = datetime.now(timezone.utc)
categorized = [categorize_issue(i, now, args.stale_days, args.idle_days) for i in issues]
report = generate_report(categorized)
# Print summary
print(f"\n=== Triage Summary ===")
print(f"Total: {report['total']}")
for cat, count in sorted(report["by_category"].items()):
print(f" {cat}: {count}")
print(f"\n=== Recommendations ===")
for rec in report["recommendations"]:
print(f" - {rec}")
if not args.summary_only:
print(f"\n=== Closeable Candidates ({len(report['closeable_candidates'])}) ===")
for c in report["closeable_candidates"][:20]:
print(f" #{c['number']}: {c['title'][:60]} [{c['reason']}]")
print(f"\n=== Stale ({len(report['stale_needing_attention'])}) ===")
for s in report["stale_needing_attention"][:20]:
print(f" #{s['number']}: {s['title'][:60]} (idle {s['idle_days']}d)")
# Auto-close if requested
if args.auto_close_stale:
print(f"\n=== Auto-close {'(DRY RUN)' if args.dry_run else '(LIVE)'} ===")
closed = auto_close_stale(token, args.repo, categorized, dry_run=args.dry_run)
print(f"{'Would close' if args.dry_run else 'Closed'} {len(closed)} issues")
# Write report
if args.report:
with open(args.report, "w") as f:
json.dump(report, f, indent=2)
print(f"\nReport written to {args.report}")
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -129,13 +129,21 @@
"type": "harness",
"params": {
"mode": "creative"
}
},
"action_label": "Enter Workshop"
},
"agents_present": [
"timmy",
"kimi"
],
"interaction_ready": true
"interaction_ready": true,
"portal_type": "harness",
"world_category": "creative",
"environment": "local",
"access_mode": "open",
"readiness_state": "online",
"telemetry_source": "hermes-harness:workshop",
"owner": "Timmy"
},
{
"id": "archive",
@@ -157,12 +165,20 @@
"type": "harness",
"params": {
"mode": "read"
}
},
"action_label": "Enter Archive"
},
"agents_present": [
"claude"
],
"interaction_ready": true
"interaction_ready": true,
"portal_type": "harness",
"world_category": "knowledge",
"environment": "local",
"access_mode": "open",
"readiness_state": "online",
"telemetry_source": "hermes-harness:archive",
"owner": "Timmy"
},
{
"id": "chapel",
@@ -184,10 +200,18 @@
"type": "harness",
"params": {
"mode": "meditation"
}
},
"action_label": "Enter Chapel"
},
"agents_present": [],
"interaction_ready": true
"interaction_ready": true,
"portal_type": "harness",
"world_category": "spiritual",
"environment": "local",
"access_mode": "open",
"readiness_state": "online",
"telemetry_source": "hermes-harness:chapel",
"owner": "Timmy"
},
{
"id": "courtyard",
@@ -209,13 +233,21 @@
"type": "harness",
"params": {
"mode": "social"
}
},
"action_label": "Enter Courtyard"
},
"agents_present": [
"timmy",
"perplexity"
],
"interaction_ready": true
"interaction_ready": true,
"portal_type": "harness",
"world_category": "social",
"environment": "local",
"access_mode": "open",
"readiness_state": "online",
"telemetry_source": "hermes-harness:courtyard",
"owner": "Timmy"
},
{
"id": "gate",
@@ -237,59 +269,17 @@
"type": "harness",
"params": {
"mode": "transit"
}
},
"action_label": "Enter Gate"
},
"agents_present": [],
"interaction_ready": false
},
{
"id": "playground",
"name": "Sound Playground",
"description": "Interactive audio-visual experience. Paint with sound, create music visually.",
"status": "online",
"color": "#ff00ff",
"role": "creative",
"position": {
"x": 10,
"y": 0,
"z": 15
},
"rotation": {
"y": -0.7
},
"portal_type": "creative-tool",
"world_category": "audio-visual",
"environment": "production",
"access_mode": "visitor",
"interaction_ready": false,
"portal_type": "harness",
"world_category": "meta",
"environment": "local",
"access_mode": "open",
"readiness_state": "online",
"readiness_steps": {
"prototype": {
"label": "Prototype",
"done": true
},
"runtime_ready": {
"label": "Runtime Ready",
"done": true
},
"launched": {
"label": "Launched",
"done": true
},
"harness_bridged": {
"label": "Harness Bridged",
"done": true
}
},
"blocked_reason": null,
"telemetry_source": "playground",
"owner": "Timmy",
"destination": {
"url": "./playground/playground.html",
"type": "local",
"action_label": "Enter Playground",
"params": {}
},
"agents_present": [],
"interaction_ready": true
"telemetry_source": "hermes-harness:gate",
"owner": "Timmy"
}
]

View File

@@ -7,6 +7,7 @@ the body (Evennia/Morrowind), and the visualization surface.
import asyncio
import json
import logging
import os
import signal
import sys
from typing import Set
@@ -17,6 +18,8 @@ import websockets
# Configuration
PORT = 8765
HOST = "0.0.0.0" # Allow external connections if needed
PORTALS_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), "portals.json")
PORTALS_POLL_INTERVAL = 2.0 # seconds
# Logging setup
logging.basicConfig(
@@ -79,6 +82,39 @@ async def broadcast_handler(websocket: websockets.WebSocketServerProtocol):
clients.discard(websocket)
logger.info(f"Client disconnected {addr}. Total clients: {len(clients)}")
async def watch_portals(stop_event: asyncio.Future):
"""Poll portals.json for changes and broadcast reload to all clients."""
last_mtime = 0.0
try:
last_mtime = os.path.getmtime(PORTALS_FILE)
except OSError:
logger.warning(f"portals.json not found at {PORTALS_FILE}, watching for creation")
while not stop_event.done():
await asyncio.sleep(PORTALS_POLL_INTERVAL)
if stop_event.done():
break
try:
current_mtime = os.path.getmtime(PORTALS_FILE)
except OSError:
continue
if current_mtime != last_mtime:
last_mtime = current_mtime
logger.info("portals.json changed — broadcasting reload")
msg = json.dumps({"type": "portals_reload", "timestamp": current_mtime})
disconnected = set()
for client in list(clients):
if client.open:
try:
await client.send(msg)
except Exception:
disconnected.add(client)
if disconnected:
clients.difference_update(disconnected)
logger.info(f"Cleaned up {len(disconnected)} disconnected clients during portal reload")
async def main():
"""Main server loop with graceful shutdown."""
logger.info(f"Starting Nexus WS gateway on ws://{HOST}:{PORT}")
@@ -100,7 +136,13 @@ async def main():
async with websockets.serve(broadcast_handler, HOST, PORT):
logger.info("Gateway is ready and listening.")
watcher_task = asyncio.create_task(watch_portals(stop))
await stop
watcher_task.cancel()
try:
await watcher_task
except asyncio.CancelledError:
pass
logger.info("Shutting down Nexus WS gateway...")
# Close any remaining client connections (handlers may have already cleaned up)

View File

@@ -1,115 +0,0 @@
"""Tests for backlog_triage — issue categorization and report generation."""
import json
from datetime import datetime, timedelta, timezone
from pathlib import Path
import pytest
import sys
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from bin.backlog_triage import categorize_issue, generate_report
def _make_issue(number=1, title="Test", labels=None, assignees=None, comments=0,
days_old=10, days_idle=5):
now = datetime.now(timezone.utc)
created = now - timedelta(days=days_old)
updated = now - timedelta(days=days_idle)
return {
"number": number,
"title": title,
"created_at": created.isoformat().replace("+00:00", "Z"),
"updated_at": updated.isoformat().replace("+00:00", "Z"),
"labels": [{"name": l} for l in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"comments": comments,
"html_url": f"https://example.com/{number}",
}
class TestCategorizeIssue:
def test_idle_issue(self):
issue = _make_issue(days_idle=70)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "idle"
def test_stale_issue(self):
issue = _make_issue(days_idle=45)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "stale"
def test_zombie_issue(self):
issue = _make_issue(days_old=100, days_idle=10, comments=0)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "zombie"
def test_unassigned_issue(self):
issue = _make_issue(assignees=[], days_old=5, days_idle=1)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "unassigned"
def test_assigned_issue(self):
issue = _make_issue(assignees=["alice"], days_old=5, days_idle=1)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "triage-needed"
def test_closeable_duplicate(self):
issue = _make_issue(labels=["duplicate"], days_old=5, days_idle=1)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "closeable"
def test_urgent_issue(self):
issue = _make_issue(labels=["p0-critical"], assignees=["bob"])
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "urgent"
def test_backlog_issue(self):
issue = _make_issue(labels=["p2-backlog"], assignees=["bob"])
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "backlog"
def test_bug_category(self):
issue = _make_issue(labels=["bug"], assignees=["bob"])
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["category"] == "bug"
def test_age_tracking(self):
issue = _make_issue(days_old=42, days_idle=7)
result = categorize_issue(issue, datetime.now(timezone.utc), 30, 60)
assert result["age_days"] >= 41
assert result["idle_days"] >= 6
class TestGenerateReport:
def test_empty_report(self):
report = generate_report([])
assert report["total"] == 0
assert report["by_category"] == {}
def test_report_categorization(self):
issues = [
_make_issue(1, "idle", days_idle=70),
_make_issue(2, "stale", days_idle=40),
_make_issue(3, "recent", assignees=["alice"]),
]
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
report = generate_report(categorized)
assert report["total"] == 3
assert "idle" in report["by_category"]
assert "stale" in report["by_category"]
def test_closeable_candidates(self):
issues = [
_make_issue(1, "old zombie", days_old=100, days_idle=100, comments=0),
_make_issue(2, "recent", assignees=["alice"]),
]
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
report = generate_report(categorized)
assert len(report["closeable_candidates"]) >= 1
assert report["closeable_candidates"][0]["number"] == 1
def test_recommendations_generated(self):
issues = [_make_issue(1, days_idle=70)]
categorized = [categorize_issue(i, datetime.now(timezone.utc), 30, 60) for i in issues]
report = generate_report(categorized)
assert len(report["recommendations"]) > 0