Compare commits
14 Commits
ci/fix-all
...
fix/glitch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
3a18d0171c | ||
| e3a40be627 | |||
| efb2df8940 | |||
| cf687a5bfa | |||
|
|
c09e54de72 | ||
| 3214437652 | |||
| 95cd259867 | |||
| 5e7bef1807 | |||
| 3d84dd5c27 | |||
| e38e80661c | |||
|
|
b71e365ed6 | ||
| c0c34cbae5 | |||
|
|
8483a6602a | ||
| af9850080a |
@@ -49,7 +49,7 @@ jobs:
|
||||
python-version: '3.11'
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
pip install py_compile flake8
|
||||
pip install flake8
|
||||
- name: Compile-check all Python files
|
||||
run: |
|
||||
find . -name '*.py' -print0 | while IFS= read -r -d '' f; do
|
||||
|
||||
97
bin/tmux-resume.sh
Executable file
97
bin/tmux-resume.sh
Executable file
@@ -0,0 +1,97 @@
|
||||
#!/usr/bin/env bash
|
||||
# ── tmux-resume.sh — Cold-start Session Resume ───────────────────────────
|
||||
# Reads ~/.timmy/tmux-state.json and resumes hermes sessions.
|
||||
# Run at startup to restore pane state after supervisor restart.
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
MANIFEST="${HOME}/.timmy/tmux-state.json"
|
||||
|
||||
if [ ! -f "$MANIFEST" ]; then
|
||||
echo "[tmux-resume] No manifest found at $MANIFEST — starting fresh."
|
||||
exit 0
|
||||
fi
|
||||
|
||||
python3 << 'PYEOF'
|
||||
import json, subprocess, os, sys
|
||||
from datetime import datetime, timezone
|
||||
|
||||
MANIFEST = os.path.expanduser("~/.timmy/tmux-state.json")
|
||||
|
||||
def run(cmd):
|
||||
try:
|
||||
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
|
||||
return r.stdout.strip(), r.returncode
|
||||
except Exception as e:
|
||||
return str(e), 1
|
||||
|
||||
def session_exists(name):
|
||||
out, _ = run(f"tmux has-session -t '{name}' 2>&1")
|
||||
return "can't find" not in out.lower()
|
||||
|
||||
with open(MANIFEST) as f:
|
||||
state = json.load(f)
|
||||
|
||||
ts = state.get("timestamp", "unknown")
|
||||
age = "unknown"
|
||||
try:
|
||||
t = datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
delta = datetime.now(timezone.utc) - t
|
||||
mins = int(delta.total_seconds() / 60)
|
||||
if mins < 60:
|
||||
age = f"{mins}m ago"
|
||||
else:
|
||||
age = f"{mins//60}h {mins%60}m ago"
|
||||
except:
|
||||
pass
|
||||
|
||||
print(f"[tmux-resume] Manifest from {age}: {state['summary']['total_sessions']} sessions, "
|
||||
f"{state['summary']['hermes_panes']} hermes panes")
|
||||
|
||||
restored = 0
|
||||
skipped = 0
|
||||
|
||||
for pane in state.get("panes", []):
|
||||
if not pane.get("is_hermes"):
|
||||
continue
|
||||
|
||||
addr = pane["address"] # e.g. "BURN:2.3"
|
||||
session = addr.split(":")[0]
|
||||
session_id = pane.get("session_id")
|
||||
profile = pane.get("profile", "default")
|
||||
model = pane.get("model", "")
|
||||
task = pane.get("task", "")
|
||||
|
||||
# Skip if session already exists (already running)
|
||||
if session_exists(session):
|
||||
print(f" [skip] {addr} — session '{session}' already exists")
|
||||
skipped += 1
|
||||
continue
|
||||
|
||||
# Respawn hermes with session resume if we have a session ID
|
||||
if session_id:
|
||||
print(f" [resume] {addr} — profile={profile} model={model} session={session_id}")
|
||||
cmd = f"hermes chat --resume {session_id}"
|
||||
else:
|
||||
print(f" [start] {addr} — profile={profile} model={model} (no session ID)")
|
||||
cmd = f"hermes chat --profile {profile}"
|
||||
|
||||
# Create tmux session and run hermes
|
||||
run(f"tmux new-session -d -s '{session}' -n '{session}:0'")
|
||||
run(f"tmux send-keys -t '{session}' '{cmd}' Enter")
|
||||
restored += 1
|
||||
|
||||
# Write resume log
|
||||
log = {
|
||||
"resumed_at": datetime.now(timezone.utc).isoformat(),
|
||||
"manifest_age": age,
|
||||
"restored": restored,
|
||||
"skipped": skipped,
|
||||
}
|
||||
log_path = os.path.expanduser("~/.timmy/tmux-resume.log")
|
||||
with open(log_path, "w") as f:
|
||||
json.dump(log, f, indent=2)
|
||||
|
||||
print(f"[tmux-resume] Done: {restored} restored, {skipped} skipped")
|
||||
PYEOF
|
||||
237
bin/tmux-state.sh
Executable file
237
bin/tmux-state.sh
Executable file
@@ -0,0 +1,237 @@
|
||||
#!/usr/bin/env bash
|
||||
# ── tmux-state.sh — Session State Persistence Manifest ───────────────────
|
||||
# Snapshots all tmux pane state to ~/.timmy/tmux-state.json
|
||||
# Run every supervisor cycle. Cold-start reads this manifest to resume.
|
||||
# ──────────────────────────────────────────────────────────────────────────
|
||||
|
||||
set -euo pipefail
|
||||
|
||||
MANIFEST="${HOME}/.timmy/tmux-state.json"
|
||||
mkdir -p "$(dirname "$MANIFEST")"
|
||||
|
||||
python3 << 'PYEOF'
|
||||
import json, subprocess, os, time, re, sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
|
||||
MANIFEST = os.path.expanduser("~/.timmy/tmux-state.json")
|
||||
|
||||
def run(cmd):
|
||||
"""Run command, return stdout or empty string."""
|
||||
try:
|
||||
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
|
||||
return r.stdout.strip()
|
||||
except Exception:
|
||||
return ""
|
||||
|
||||
def get_sessions():
|
||||
"""Get all tmux sessions with metadata."""
|
||||
raw = run("tmux list-sessions -F '#{session_name}|#{session_windows}|#{session_created}|#{session_attached}|#{session_group}|#{session_id}'")
|
||||
sessions = []
|
||||
for line in raw.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
parts = line.split("|")
|
||||
if len(parts) < 6:
|
||||
continue
|
||||
sessions.append({
|
||||
"name": parts[0],
|
||||
"windows": int(parts[1]),
|
||||
"created_epoch": int(parts[2]),
|
||||
"created": datetime.fromtimestamp(int(parts[2]), tz=timezone.utc).isoformat(),
|
||||
"attached": parts[3] == "1",
|
||||
"group": parts[4],
|
||||
"id": parts[5],
|
||||
})
|
||||
return sessions
|
||||
|
||||
def get_panes():
|
||||
"""Get all tmux panes with full metadata."""
|
||||
fmt = '#{session_name}|#{window_index}|#{pane_index}|#{pane_pid}|#{pane_title}|#{pane_width}x#{pane_height}|#{pane_active}|#{pane_current_command}|#{pane_start_command}|#{pane_tty}|#{pane_id}|#{window_name}|#{session_id}'
|
||||
raw = run(f"tmux list-panes -a -F '{fmt}'")
|
||||
panes = []
|
||||
for line in raw.splitlines():
|
||||
if not line.strip():
|
||||
continue
|
||||
parts = line.split("|")
|
||||
if len(parts) < 13:
|
||||
continue
|
||||
session, win, pane, pid, title, size, active, cmd, start_cmd, tty, pane_id, win_name, sess_id = parts[:13]
|
||||
w, h = size.split("x") if "x" in size else ("0", "0")
|
||||
panes.append({
|
||||
"session": session,
|
||||
"window_index": int(win),
|
||||
"window_name": win_name,
|
||||
"pane_index": int(pane),
|
||||
"pane_id": pane_id,
|
||||
"pid": int(pid) if pid.isdigit() else 0,
|
||||
"title": title,
|
||||
"width": int(w),
|
||||
"height": int(h),
|
||||
"active": active == "1",
|
||||
"command": cmd,
|
||||
"start_command": start_cmd,
|
||||
"tty": tty,
|
||||
"session_id": sess_id,
|
||||
})
|
||||
return panes
|
||||
|
||||
def extract_hermes_state(pane):
|
||||
"""Try to extract hermes session info from a pane."""
|
||||
info = {
|
||||
"is_hermes": False,
|
||||
"profile": None,
|
||||
"model": None,
|
||||
"provider": None,
|
||||
"session_id": None,
|
||||
"task": None,
|
||||
}
|
||||
title = pane.get("title", "")
|
||||
cmd = pane.get("command", "")
|
||||
start = pane.get("start_command", "")
|
||||
|
||||
# Detect hermes processes
|
||||
is_hermes = any(k in (title + " " + cmd + " " + start).lower()
|
||||
for k in ["hermes", "timmy", "mimo", "claude", "gpt"])
|
||||
if not is_hermes and cmd not in ("python3", "python3.11", "bash", "zsh", "fish"):
|
||||
return info
|
||||
|
||||
# Try reading pane content for model/provider clues
|
||||
pane_content = run(f"tmux capture-pane -t '{pane['session']}:{pane['window_index']}.{pane['pane_index']}' -p -S -20 2>/dev/null")
|
||||
|
||||
# Extract model from pane content patterns
|
||||
model_patterns = [
|
||||
r"(?:mimo-v2-pro|claude-[\w.-]+|gpt-[\w.-]+|gemini-[\w.-]+|qwen[\w:.-]*)",
|
||||
]
|
||||
for pat in model_patterns:
|
||||
m = re.search(pat, pane_content, re.IGNORECASE)
|
||||
if m:
|
||||
info["model"] = m.group(0)
|
||||
info["is_hermes"] = True
|
||||
break
|
||||
|
||||
# Provider inference from model
|
||||
model = (info["model"] or "").lower()
|
||||
if "mimo" in model:
|
||||
info["provider"] = "nous"
|
||||
elif "claude" in model:
|
||||
info["provider"] = "anthropic"
|
||||
elif "gpt" in model:
|
||||
info["provider"] = "openai"
|
||||
elif "gemini" in model:
|
||||
info["provider"] = "google"
|
||||
elif "qwen" in model:
|
||||
info["provider"] = "custom"
|
||||
|
||||
# Profile from session name
|
||||
session = pane["session"].lower()
|
||||
if "burn" in session:
|
||||
info["profile"] = "burn"
|
||||
elif session in ("dev", "0"):
|
||||
info["profile"] = "default"
|
||||
else:
|
||||
info["profile"] = session
|
||||
|
||||
# Try to extract session ID (hermes uses UUIDs)
|
||||
uuid_match = re.findall(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', pane_content)
|
||||
if uuid_match:
|
||||
info["session_id"] = uuid_match[-1] # most recent
|
||||
info["is_hermes"] = True
|
||||
|
||||
# Last prompt — grab the last user-like line
|
||||
lines = pane_content.splitlines()
|
||||
for line in reversed(lines):
|
||||
stripped = line.strip()
|
||||
if stripped and not stripped.startswith(("─", "│", "╭", "╰", "▸", "●", "○")) and len(stripped) > 10:
|
||||
info["task"] = stripped[:200]
|
||||
break
|
||||
|
||||
return info
|
||||
|
||||
def get_context_percent(pane):
|
||||
"""Estimate context usage from pane content heuristics."""
|
||||
content = run(f"tmux capture-pane -t '{pane['session']}:{pane['window_index']}.{pane['pane_index']}' -p -S -5 2>/dev/null")
|
||||
# Look for context indicators like "ctx 45%" or "[░░░░░░░░░░]"
|
||||
ctx_match = re.search(r'ctx\s*(\d+)%', content)
|
||||
if ctx_match:
|
||||
return int(ctx_match.group(1))
|
||||
bar_match = re.search(r'\[(░+█*█*░*)\]', content)
|
||||
if bar_match:
|
||||
bar = bar_match.group(1)
|
||||
filled = bar.count('█')
|
||||
total = len(bar)
|
||||
if total > 0:
|
||||
return int((filled / total) * 100)
|
||||
return None
|
||||
|
||||
def build_manifest():
|
||||
"""Build the full tmux state manifest."""
|
||||
now = datetime.now(timezone.utc)
|
||||
sessions = get_sessions()
|
||||
panes = get_panes()
|
||||
|
||||
pane_manifests = []
|
||||
for p in panes:
|
||||
hermes = extract_hermes_state(p)
|
||||
ctx = get_context_percent(p)
|
||||
|
||||
entry = {
|
||||
"address": f"{p['session']}:{p['window_index']}.{p['pane_index']}",
|
||||
"pane_id": p["pane_id"],
|
||||
"pid": p["pid"],
|
||||
"size": f"{p['width']}x{p['height']}",
|
||||
"active": p["active"],
|
||||
"command": p["command"],
|
||||
"title": p["title"],
|
||||
"profile": hermes["profile"],
|
||||
"model": hermes["model"],
|
||||
"provider": hermes["provider"],
|
||||
"session_id": hermes["session_id"],
|
||||
"task": hermes["task"],
|
||||
"context_pct": ctx,
|
||||
"is_hermes": hermes["is_hermes"],
|
||||
}
|
||||
pane_manifests.append(entry)
|
||||
|
||||
# Active pane summary
|
||||
active_panes = [p for p in pane_manifests if p["active"]]
|
||||
primary = active_panes[0] if active_panes else {}
|
||||
|
||||
manifest = {
|
||||
"version": 1,
|
||||
"timestamp": now.isoformat(),
|
||||
"timestamp_epoch": int(now.timestamp()),
|
||||
"hostname": os.uname().nodename,
|
||||
"sessions": sessions,
|
||||
"panes": pane_manifests,
|
||||
"summary": {
|
||||
"total_sessions": len(sessions),
|
||||
"total_panes": len(pane_manifests),
|
||||
"hermes_panes": sum(1 for p in pane_manifests if p["is_hermes"]),
|
||||
"active_pane": primary.get("address"),
|
||||
"active_model": primary.get("model"),
|
||||
"active_provider": primary.get("provider"),
|
||||
},
|
||||
}
|
||||
|
||||
return manifest
|
||||
|
||||
# --- Main ---
|
||||
manifest = build_manifest()
|
||||
|
||||
# Write manifest
|
||||
with open(MANIFEST, "w") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
# Also write to ~/.hermes/tmux-state.json for compatibility
|
||||
hermes_manifest = os.path.expanduser("~/.hermes/tmux-state.json")
|
||||
os.makedirs(os.path.dirname(hermes_manifest), exist_ok=True)
|
||||
with open(hermes_manifest, "w") as f:
|
||||
json.dump(manifest, f, indent=2)
|
||||
|
||||
print(f"[tmux-state] {manifest['summary']['total_panes']} panes, "
|
||||
f"{manifest['summary']['hermes_panes']} hermes, "
|
||||
f"active={manifest['summary']['active_pane']} "
|
||||
f"@ {manifest['summary']['active_model']}")
|
||||
print(f"[tmux-state] written to {MANIFEST}")
|
||||
PYEOF
|
||||
@@ -7,7 +7,7 @@ on:
|
||||
branches: [main]
|
||||
|
||||
concurrency:
|
||||
group: forge-ci-${{ gitea.ref }}
|
||||
group: forge-ci-${{ github.ref }}
|
||||
cancel-in-progress: true
|
||||
|
||||
jobs:
|
||||
@@ -18,40 +18,21 @@ jobs:
|
||||
- name: Checkout code
|
||||
uses: actions/checkout@v4
|
||||
|
||||
- name: Install uv
|
||||
uses: astral-sh/setup-uv@v5
|
||||
with:
|
||||
enable-cache: true
|
||||
cache-dependency-glob: "uv.lock"
|
||||
|
||||
- name: Set up Python 3.11
|
||||
run: uv python install 3.11
|
||||
uses: actions/setup-python@v5
|
||||
with:
|
||||
python-version: '3.11'
|
||||
|
||||
- name: Install package
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
uv venv .venv --python 3.11
|
||||
source .venv/bin/activate
|
||||
uv pip install -e ".[all,dev]"
|
||||
pip install pytest pyyaml
|
||||
|
||||
- name: Smoke tests
|
||||
run: |
|
||||
source .venv/bin/activate
|
||||
python scripts/smoke_test.py
|
||||
run: python scripts/smoke_test.py
|
||||
env:
|
||||
OPENROUTER_API_KEY: ""
|
||||
OPENAI_API_KEY: ""
|
||||
NOUS_API_KEY: ""
|
||||
|
||||
- name: Syntax guard
|
||||
run: |
|
||||
source .venv/bin/activate
|
||||
python scripts/syntax_guard.py
|
||||
|
||||
- name: Green-path E2E
|
||||
run: |
|
||||
source .venv/bin/activate
|
||||
python -m pytest tests/test_green_path_e2e.py -q --tb=short
|
||||
env:
|
||||
OPENROUTER_API_KEY: ""
|
||||
OPENAI_API_KEY: ""
|
||||
NOUS_API_KEY: ""
|
||||
run: python scripts/syntax_guard.py
|
||||
|
||||
@@ -22,7 +22,7 @@ jobs:
|
||||
|
||||
- name: Install dependencies
|
||||
run: |
|
||||
pip install papermill jupytext nbformat
|
||||
pip install papermill jupytext nbformat ipykernel
|
||||
python -m ipykernel install --user --name python3
|
||||
|
||||
- name: Execute system health notebook
|
||||
|
||||
@@ -1,12 +1,599 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
matrix_glitch_detect.py — 3D World Visual Artifact Detection for The Matrix.
|
||||
|
||||
Scans screenshots or live pages for visual glitches: floating assets, z-fighting,
|
||||
texture pop-in, clipping, broken meshes, lighting artifacts. Outputs structured
|
||||
JSON, text, or standalone HTML report with annotated screenshots.
|
||||
|
||||
Usage:
|
||||
# Scan a screenshot
|
||||
python scripts/matrix_glitch_detect.py --image screenshot.png
|
||||
|
||||
# Scan with vision model
|
||||
python scripts/matrix_glitch_detect.py --image screenshot.png --vision
|
||||
|
||||
# HTML report
|
||||
python scripts/matrix_glitch_detect.py --image screenshot.png --html report.html
|
||||
|
||||
# Scan live Matrix page
|
||||
python scripts/matrix_glitch_detect.py --url https://matrix.alexanderwhitestone.com
|
||||
|
||||
# Batch scan a directory
|
||||
python scripts/matrix_glitch_detect.py --batch ./screenshots/ --html batch-report.html
|
||||
|
||||
Refs: timmy-config#491, #541, #543, #544
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import base64
|
||||
import html as html_module
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
import urllib.error
|
||||
import urllib.request
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
def detect_glitches():
|
||||
browser_navigate(url="https://matrix.alexanderwhitestone.com")
|
||||
analysis = browser_vision(
|
||||
question="Scan the 3D world for visual artifacts, floating assets, or z-fighting. List all coordinates/descriptions of glitches found. Provide a PASS/FAIL."
|
||||
|
||||
# === Configuration ===
|
||||
|
||||
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
|
||||
|
||||
|
||||
class Severity(str, Enum):
|
||||
CRITICAL = "critical"
|
||||
MAJOR = "major"
|
||||
MINOR = "minor"
|
||||
COSMETIC = "cosmetic"
|
||||
|
||||
|
||||
@dataclass
|
||||
class Glitch:
|
||||
"""A single detected visual artifact."""
|
||||
type: str = "" # floating_asset, z_fighting, texture_pop, clipping, lighting, mesh_break
|
||||
severity: Severity = Severity.MINOR
|
||||
region: str = "" # "upper-left", "center", "bottom-right", or coordinates
|
||||
description: str = ""
|
||||
confidence: float = 0.0 # 0.0-1.0
|
||||
source: str = "" # "programmatic", "vision", "pixel_analysis"
|
||||
|
||||
|
||||
@dataclass
|
||||
class GlitchReport:
|
||||
"""Complete glitch detection report."""
|
||||
source: str = "" # file path or URL
|
||||
timestamp: str = ""
|
||||
status: str = "PASS" # PASS, WARN, FAIL
|
||||
score: int = 100
|
||||
glitches: list[Glitch] = field(default_factory=list)
|
||||
summary: str = ""
|
||||
model_used: str = ""
|
||||
width: int = 0
|
||||
height: int = 0
|
||||
|
||||
|
||||
# === Programmatic Analysis ===
|
||||
|
||||
def analyze_pixels(image_path: str) -> list[Glitch]:
|
||||
"""Programmatic pixel analysis for common 3D glitches."""
|
||||
glitches = []
|
||||
|
||||
try:
|
||||
from PIL import Image
|
||||
img = Image.open(image_path).convert("RGB")
|
||||
w, h = img.size
|
||||
pixels = img.load()
|
||||
|
||||
# Check for solid-color regions (render failure)
|
||||
corner_colors = [
|
||||
pixels[0, 0], pixels[w-1, 0], pixels[0, h-1], pixels[w-1, h-1]
|
||||
]
|
||||
if all(c == corner_colors[0] for c in corner_colors):
|
||||
# All corners same color — check if it's black (render failure)
|
||||
if corner_colors[0] == (0, 0, 0):
|
||||
glitches.append(Glitch(
|
||||
type="render_failure",
|
||||
severity=Severity.CRITICAL,
|
||||
region="entire frame",
|
||||
description="Entire frame is black — 3D scene failed to render",
|
||||
confidence=0.9,
|
||||
source="pixel_analysis"
|
||||
))
|
||||
|
||||
# Check for horizontal tearing lines
|
||||
tear_count = 0
|
||||
for y in range(0, h, max(1, h // 20)):
|
||||
row_start = pixels[0, y]
|
||||
same_count = sum(1 for x in range(w) if pixels[x, y] == row_start)
|
||||
if same_count > w * 0.95:
|
||||
tear_count += 1
|
||||
if tear_count > 3:
|
||||
glitches.append(Glitch(
|
||||
type="horizontal_tear",
|
||||
severity=Severity.MAJOR,
|
||||
region=f"{tear_count} lines",
|
||||
description=f"Horizontal tearing detected — {tear_count} mostly-solid scanlines",
|
||||
confidence=0.7,
|
||||
source="pixel_analysis"
|
||||
))
|
||||
|
||||
# Check for extreme brightness variance (lighting artifacts)
|
||||
import statistics
|
||||
brightness_samples = []
|
||||
for y in range(0, h, max(1, h // 50)):
|
||||
for x in range(0, w, max(1, w // 50)):
|
||||
r, g, b = pixels[x, y]
|
||||
brightness_samples.append(0.299 * r + 0.587 * g + 0.114 * b)
|
||||
if brightness_samples:
|
||||
stdev = statistics.stdev(brightness_samples)
|
||||
if stdev > 100:
|
||||
glitches.append(Glitch(
|
||||
type="lighting",
|
||||
severity=Severity.MINOR,
|
||||
region="global",
|
||||
description=f"Extreme brightness variance (stdev={stdev:.0f}) — possible lighting artifacts",
|
||||
confidence=0.5,
|
||||
source="pixel_analysis"
|
||||
))
|
||||
|
||||
except ImportError:
|
||||
pass # PIL not available
|
||||
except Exception as e:
|
||||
pass
|
||||
|
||||
return glitches
|
||||
|
||||
|
||||
# === Vision Analysis ===
|
||||
|
||||
GLITCH_VISION_PROMPT = """You are a 3D world QA engineer. Analyze this screenshot from a Three.js 3D world (The Matrix) for visual glitches and artifacts.
|
||||
|
||||
Look for these specific issues:
|
||||
|
||||
1. FLOATING ASSETS: Objects hovering above surfaces where they should rest. Look for shadows detached from objects.
|
||||
|
||||
2. Z-FIGHTING: Flickering or shimmering surfaces where two polygons overlap at the same depth. Usually appears as striped or dithered patterns.
|
||||
|
||||
3. TEXTURE POP-IN: Low-resolution textures that haven't loaded, or textures that suddenly change quality between frames.
|
||||
|
||||
4. CLIPPING: Objects passing through walls, floors, or other objects. Characters partially inside geometry.
|
||||
|
||||
5. LIGHTING ARTIFACTS: Hard light seams, black patches, overexposed areas, lights not illuminating correctly.
|
||||
|
||||
6. MESH BREAKS: Visible seams in geometry, missing faces on 3D objects, holes in surfaces.
|
||||
|
||||
7. RENDER FAILURE: Black areas where geometry should be, missing skybox, incomplete frame rendering.
|
||||
|
||||
8. UI OVERLAP: UI elements overlapping 3D viewport incorrectly.
|
||||
|
||||
Respond as JSON:
|
||||
{
|
||||
"glitches": [
|
||||
{
|
||||
"type": "floating_asset|z_fighting|texture_pop|clipping|lighting|mesh_break|render_failure|ui_overlap",
|
||||
"severity": "critical|major|minor|cosmetic",
|
||||
"region": "description of where",
|
||||
"description": "detailed description of the artifact",
|
||||
"confidence": 0.0-1.0
|
||||
}
|
||||
],
|
||||
"overall_quality": 0-100,
|
||||
"summary": "brief assessment"
|
||||
}"""
|
||||
|
||||
|
||||
def run_vision_analysis(image_path: str, model: str = VISION_MODEL) -> tuple[list[Glitch], int]:
|
||||
"""Run vision model glitch analysis."""
|
||||
try:
|
||||
b64 = base64.b64encode(Path(image_path).read_bytes()).decode()
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": [
|
||||
{"type": "text", "text": GLITCH_VISION_PROMPT},
|
||||
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}
|
||||
]}],
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1}
|
||||
}).encode()
|
||||
|
||||
req = urllib.request.Request(
|
||||
f"{OLLAMA_BASE}/api/chat",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
with urllib.request.urlopen(req, timeout=120) as resp:
|
||||
result = json.loads(resp.read())
|
||||
content = result.get("message", {}).get("content", "")
|
||||
|
||||
parsed = _parse_json_response(content)
|
||||
glitches = []
|
||||
for g in parsed.get("glitches", []):
|
||||
glitches.append(Glitch(
|
||||
type=g.get("type", "unknown"),
|
||||
severity=Severity(g.get("severity", "minor")),
|
||||
region=g.get("region", ""),
|
||||
description=g.get("description", ""),
|
||||
confidence=float(g.get("confidence", 0.5)),
|
||||
source="vision"
|
||||
))
|
||||
return glitches, parsed.get("overall_quality", 80)
|
||||
|
||||
except Exception as e:
|
||||
print(f" Vision analysis failed: {e}", file=sys.stderr)
|
||||
return [], 50
|
||||
|
||||
|
||||
def _parse_json_response(text: str) -> dict:
|
||||
cleaned = text.strip()
|
||||
if cleaned.startswith("```"):
|
||||
lines = cleaned.split("\n")[1:]
|
||||
if lines and lines[-1].strip() == "```":
|
||||
lines = lines[:-1]
|
||||
cleaned = "\n".join(lines)
|
||||
try:
|
||||
return json.loads(cleaned)
|
||||
except json.JSONDecodeError:
|
||||
start = cleaned.find("{")
|
||||
end = cleaned.rfind("}")
|
||||
if start >= 0 and end > start:
|
||||
try:
|
||||
return json.loads(cleaned[start:end + 1])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
# === Screenshot Capture ===
|
||||
|
||||
def capture_screenshot(url: str, output_path: str) -> bool:
|
||||
"""Take a screenshot of a URL."""
|
||||
try:
|
||||
script = f"""
|
||||
from playwright.sync_api import sync_playwright
|
||||
with sync_playwright() as p:
|
||||
browser = p.chromium.launch(headless=True)
|
||||
page = browser.new_page(viewport={{"width": 1280, "height": 720}})
|
||||
page.goto("{url}", wait_until="networkidle", timeout=30000)
|
||||
page.wait_for_timeout(3000)
|
||||
page.screenshot(path="{output_path}")
|
||||
browser.close()
|
||||
"""
|
||||
result = subprocess.run(["python3", "-c", script], capture_output=True, text=True, timeout=60)
|
||||
return result.returncode == 0 and Path(output_path).exists()
|
||||
except Exception:
|
||||
return False
|
||||
|
||||
|
||||
# === Detection Logic ===
|
||||
|
||||
def detect_glitches(image_path: str, use_vision: bool = False,
|
||||
model: str = VISION_MODEL) -> GlitchReport:
|
||||
"""Run full glitch detection on an image."""
|
||||
report = GlitchReport(
|
||||
source=image_path,
|
||||
timestamp=datetime.now().isoformat(),
|
||||
model_used=model if use_vision else "none"
|
||||
)
|
||||
return {"status": "PASS" if "PASS" in analysis.upper() else "FAIL", "analysis": analysis}
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(detect_glitches(), indent=2))
|
||||
if not Path(image_path).exists():
|
||||
report.status = "FAIL"
|
||||
report.summary = f"File not found: {image_path}"
|
||||
report.score = 0
|
||||
return report
|
||||
|
||||
# Get image dimensions
|
||||
try:
|
||||
from PIL import Image
|
||||
img = Image.open(image_path)
|
||||
report.width, report.height = img.size
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Programmatic analysis
|
||||
prog_glitches = analyze_pixels(image_path)
|
||||
report.glitches.extend(prog_glitches)
|
||||
|
||||
# Vision analysis
|
||||
if use_vision:
|
||||
print(f" Running vision analysis on {image_path}...", file=sys.stderr)
|
||||
vision_glitches, quality = run_vision_analysis(image_path, model)
|
||||
report.glitches.extend(vision_glitches)
|
||||
report.score = quality
|
||||
else:
|
||||
# Score based on programmatic results
|
||||
criticals = sum(1 for g in report.glitches if g.severity == Severity.CRITICAL)
|
||||
majors = sum(1 for g in report.glitches if g.severity == Severity.MAJOR)
|
||||
report.score = max(0, 100 - criticals * 40 - majors * 15)
|
||||
|
||||
# Determine status
|
||||
criticals = sum(1 for g in report.glitches if g.severity == Severity.CRITICAL)
|
||||
majors = sum(1 for g in report.glitches if g.severity == Severity.MAJOR)
|
||||
|
||||
if criticals > 0:
|
||||
report.status = "FAIL"
|
||||
elif majors > 0 or report.score < 70:
|
||||
report.status = "WARN"
|
||||
else:
|
||||
report.status = "PASS"
|
||||
|
||||
report.summary = (
|
||||
f"{report.status}: {len(report.glitches)} glitch(es) found "
|
||||
f"({criticals} critical, {majors} major), score {report.score}/100"
|
||||
)
|
||||
|
||||
return report
|
||||
|
||||
|
||||
# === HTML Report Generator ===
|
||||
|
||||
def generate_html_report(reports: list[GlitchReport], title: str = "Glitch Detection Report") -> str:
|
||||
"""Generate a standalone HTML report with annotated details."""
|
||||
total_glitches = sum(len(r.glitches) for r in reports)
|
||||
total_criticals = sum(sum(1 for g in r.glitches if g.severity == Severity.CRITICAL) for r in reports)
|
||||
avg_score = sum(r.score for r in reports) // max(1, len(reports))
|
||||
|
||||
if total_criticals > 0:
|
||||
overall_verdict = "FAIL"
|
||||
verdict_color = "#f44336"
|
||||
elif any(r.status == "WARN" for r in reports):
|
||||
overall_verdict = "WARN"
|
||||
verdict_color = "#ff9800"
|
||||
else:
|
||||
overall_verdict = "PASS"
|
||||
verdict_color = "#4caf50"
|
||||
|
||||
# Build HTML
|
||||
parts = []
|
||||
parts.append(f"""<!DOCTYPE html>
|
||||
<html lang="en">
|
||||
<head>
|
||||
<meta charset="UTF-8">
|
||||
<meta name="viewport" content="width=device-width, initial-scale=1.0">
|
||||
<title>{html_module.escape(title)}</title>
|
||||
<style>
|
||||
*{{margin:0;padding:0;box-sizing:border-box}}
|
||||
body{{font-family:-apple-system,BlinkMacSystemFont,'Segoe UI',Roboto,monospace;background:#0a0a14;color:#c0c0d0;font-size:13px;line-height:1.5}}
|
||||
.container{{max-width:1000px;margin:0 auto;padding:20px}}
|
||||
header{{text-align:center;padding:24px 0;border-bottom:1px solid #1a1a2e;margin-bottom:24px}}
|
||||
header h1{{font-size:20px;font-weight:300;letter-spacing:3px;color:#4a9eff;margin-bottom:8px}}
|
||||
.verdict{{display:inline-block;padding:6px 20px;border-radius:4px;font-size:14px;font-weight:700;letter-spacing:2px;color:#fff;background:{verdict_color}}}
|
||||
.stats{{display:flex;gap:16px;justify-content:center;margin:16px 0;flex-wrap:wrap}}
|
||||
.stat{{background:#0e0e1a;border:1px solid #1a1a2e;border-radius:4px;padding:8px 16px;text-align:center}}
|
||||
.stat .val{{font-size:20px;font-weight:700;color:#4a9eff}}
|
||||
.stat .lbl{{font-size:9px;color:#666;text-transform:uppercase;letter-spacing:1px}}
|
||||
.score-gauge{{width:120px;height:120px;margin:0 auto 16px;position:relative}}
|
||||
.score-gauge svg{{transform:rotate(-90deg)}}
|
||||
.score-gauge .score-text{{position:absolute;top:50%;left:50%;transform:translate(-50%,-50%);font-size:28px;font-weight:700}}
|
||||
.report-card{{background:#0e0e1a;border:1px solid #1a1a2e;border-radius:6px;margin-bottom:16px;overflow:hidden}}
|
||||
.report-header{{padding:12px 16px;border-bottom:1px solid #1a1a2e;display:flex;justify-content:space-between;align-items:center}}
|
||||
.report-header .source{{color:#4a9eff;font-weight:600;word-break:break-all}}
|
||||
.report-header .status-badge{{padding:2px 10px;border-radius:3px;font-size:11px;font-weight:700;color:#fff}}
|
||||
.status-pass{{background:#4caf50}}
|
||||
.status-warn{{background:#ff9800}}
|
||||
.status-fail{{background:#f44336}}
|
||||
.screenshot{{text-align:center;padding:12px;background:#080810}}
|
||||
.screenshot img{{max-width:100%;max-height:400px;border:1px solid #1a1a2e;border-radius:4px}}
|
||||
.glitch-list{{padding:12px 16px}}
|
||||
.glitch-item{{padding:8px 0;border-bottom:1px solid #111;display:flex;gap:12px;align-items:flex-start}}
|
||||
.glitch-item:last-child{{border-bottom:none}}
|
||||
.severity-dot{{width:8px;height:8px;border-radius:50%;margin-top:5px;flex-shrink:0}}
|
||||
.sev-critical{{background:#f44336}}
|
||||
.sev-major{{background:#ff9800}}
|
||||
.sev-minor{{background:#2196f3}}
|
||||
.sev-cosmetic{{background:#666}}
|
||||
.glitch-detail{{flex:1}}
|
||||
.glitch-type{{color:#ffd700;font-weight:600;font-size:11px;text-transform:uppercase;letter-spacing:1px}}
|
||||
.glitch-desc{{color:#aaa;font-size:12px;margin-top:2px}}
|
||||
.glitch-meta{{color:#555;font-size:10px;margin-top:2px}}
|
||||
.no-glitches{{color:#4caf50;text-align:center;padding:20px;font-style:italic}}
|
||||
footer{{text-align:center;padding:16px;color:#444;font-size:10px;border-top:1px solid #1a1a2e;margin-top:24px}}
|
||||
</style>
|
||||
</head>
|
||||
<body>
|
||||
<div class="container">
|
||||
<header>
|
||||
<h1>{html_module.escape(title)}</h1>
|
||||
<div class="verdict">{overall_verdict}</div>
|
||||
<div class="stats">
|
||||
<div class="stat"><div class="val">{len(reports)}</div><div class="lbl">Screenshots</div></div>
|
||||
<div class="stat"><div class="val">{total_glitches}</div><div class="lbl">Glitches</div></div>
|
||||
<div class="stat"><div class="val">{total_criticals}</div><div class="lbl">Critical</div></div>
|
||||
<div class="stat"><div class="val">{avg_score}</div><div class="lbl">Avg Score</div></div>
|
||||
</div>
|
||||
</header>
|
||||
""")
|
||||
|
||||
# Score gauge
|
||||
score_color = "#4caf50" if avg_score >= 80 else "#ff9800" if avg_score >= 60 else "#f44336"
|
||||
circumference = 2 * 3.14159 * 50
|
||||
dash_offset = circumference * (1 - avg_score / 100)
|
||||
parts.append(f"""
|
||||
<div class="score-gauge">
|
||||
<svg width="120" height="120" viewBox="0 0 120 120">
|
||||
<circle cx="60" cy="60" r="50" fill="none" stroke="#1a1a2e" stroke-width="8"/>
|
||||
<circle cx="60" cy="60" r="50" fill="none" stroke="{score_color}" stroke-width="8"
|
||||
stroke-dasharray="{circumference}" stroke-dashoffset="{dash_offset}" stroke-linecap="round"/>
|
||||
</svg>
|
||||
<div class="score-text" style="color:{score_color}">{avg_score}</div>
|
||||
</div>
|
||||
""")
|
||||
|
||||
# Per-screenshot reports
|
||||
for i, report in enumerate(reports):
|
||||
status_class = f"status-{report.status.lower()}"
|
||||
source_name = Path(report.source).name if report.source else f"Screenshot {i+1}"
|
||||
|
||||
# Inline screenshot as base64
|
||||
img_tag = ""
|
||||
if report.source and Path(report.source).exists():
|
||||
try:
|
||||
b64 = base64.b64encode(Path(report.source).read_bytes()).decode()
|
||||
ext = Path(report.source).suffix.lower()
|
||||
mime = "image/png" if ext == ".png" else "image/jpeg" if ext in (".jpg", ".jpeg") else "image/webp"
|
||||
img_tag = f'<img src="data:{mime};base64,{b64}" alt="Screenshot">'
|
||||
except Exception:
|
||||
img_tag = '<div style="color:#666;padding:40px">Screenshot unavailable</div>'
|
||||
else:
|
||||
img_tag = '<div style="color:#666;padding:40px">No screenshot</div>'
|
||||
|
||||
parts.append(f"""
|
||||
<div class="report-card">
|
||||
<div class="report-header">
|
||||
<span class="source">{html_module.escape(source_name)} ({report.width}x{report.height})</span>
|
||||
<span class="status-badge {status_class}">{report.status} — {report.score}/100</span>
|
||||
</div>
|
||||
<div class="screenshot">{img_tag}</div>
|
||||
""")
|
||||
|
||||
if report.glitches:
|
||||
parts.append('<div class="glitch-list">')
|
||||
for g in sorted(report.glitches, key=lambda x: {"critical": 0, "major": 1, "minor": 2, "cosmetic": 3}.get(x.severity.value if hasattr(x.severity, "value") else str(x.severity), 4)):
|
||||
sev = g.severity.value if hasattr(g.severity, "value") else str(g.severity)
|
||||
sev_class = f"sev-{sev}"
|
||||
parts.append(f"""
|
||||
<div class="glitch-item">
|
||||
<div class="severity-dot {sev_class}"></div>
|
||||
<div class="glitch-detail">
|
||||
<div class="glitch-type">{html_module.escape(g.type)} — {sev.upper()}</div>
|
||||
<div class="glitch-desc">{html_module.escape(g.description)}</div>
|
||||
<div class="glitch-meta">Region: {html_module.escape(g.region)} | Confidence: {g.confidence:.0%} | Source: {html_module.escape(g.source)}</div>
|
||||
</div>
|
||||
</div>""")
|
||||
parts.append('</div>')
|
||||
else:
|
||||
parts.append('<div class="no-glitches">No glitches detected</div>')
|
||||
|
||||
parts.append('</div><!-- /report-card -->')
|
||||
|
||||
# Footer
|
||||
parts.append(f"""
|
||||
<footer>
|
||||
Generated {datetime.now().strftime('%Y-%m-%d %H:%M:%S')} | matrix_glitch_detect.py | timmy-config#544
|
||||
</footer>
|
||||
</div>
|
||||
</body>
|
||||
</html>""")
|
||||
|
||||
return "\n".join(parts)
|
||||
|
||||
|
||||
# === Output Formatting ===
|
||||
|
||||
def format_report(report: GlitchReport, fmt: str = "json") -> str:
|
||||
if fmt == "json":
|
||||
data = {
|
||||
"source": report.source,
|
||||
"timestamp": report.timestamp,
|
||||
"status": report.status,
|
||||
"score": report.score,
|
||||
"glitches": [asdict(g) for g in report.glitches],
|
||||
"summary": report.summary,
|
||||
"model_used": report.model_used,
|
||||
}
|
||||
for g in data["glitches"]:
|
||||
if hasattr(g["severity"], "value"):
|
||||
g["severity"] = g["severity"].value
|
||||
return json.dumps(data, indent=2)
|
||||
|
||||
elif fmt == "text":
|
||||
lines = [
|
||||
"=" * 50,
|
||||
" GLITCH DETECTION REPORT",
|
||||
"=" * 50,
|
||||
f" Source: {report.source}",
|
||||
f" Status: {report.status}",
|
||||
f" Score: {report.score}/100",
|
||||
f" Glitches: {len(report.glitches)}",
|
||||
"",
|
||||
]
|
||||
icons = {"critical": "🔴", "major": "🟡", "minor": "🔵", "cosmetic": "⚪"}
|
||||
for g in report.glitches:
|
||||
sev = g.severity.value if hasattr(g.severity, "value") else str(g.severity)
|
||||
icon = icons.get(sev, "?")
|
||||
lines.append(f" {icon} [{g.type}] {sev.upper()}: {g.description}")
|
||||
lines.append(f" Region: {g.region} | Confidence: {g.confidence:.0%}")
|
||||
lines.append("")
|
||||
lines.append(f" {report.summary}")
|
||||
lines.append("=" * 50)
|
||||
return "\n".join(lines)
|
||||
|
||||
return ""
|
||||
|
||||
|
||||
# === CLI ===
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="3D World Glitch Detection — visual artifact scanner for The Matrix"
|
||||
)
|
||||
group = parser.add_mutually_exclusive_group(required=True)
|
||||
group.add_argument("--image", help="Screenshot file to analyze")
|
||||
group.add_argument("--url", help="URL to screenshot and analyze")
|
||||
group.add_argument("--batch", help="Directory of screenshots to analyze")
|
||||
|
||||
parser.add_argument("--vision", action="store_true", help="Include vision model analysis")
|
||||
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
|
||||
parser.add_argument("--html", help="Generate HTML report at this path")
|
||||
parser.add_argument("--format", choices=["json", "text"], default="json", help="Output format")
|
||||
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
reports = []
|
||||
|
||||
if args.image:
|
||||
print(f"Analyzing {args.image}...", file=sys.stderr)
|
||||
report = detect_glitches(args.image, args.vision, args.model)
|
||||
reports.append(report)
|
||||
if not args.html:
|
||||
print(format_report(report, args.format))
|
||||
|
||||
elif args.url:
|
||||
import tempfile
|
||||
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
|
||||
screenshot_path = tmp.name
|
||||
print(f"Capturing screenshot of {args.url}...", file=sys.stderr)
|
||||
if capture_screenshot(args.url, screenshot_path):
|
||||
report = detect_glitches(screenshot_path, args.vision, args.model)
|
||||
report.source = args.url
|
||||
reports.append(report)
|
||||
if not args.html:
|
||||
print(format_report(report, args.format))
|
||||
else:
|
||||
print(f"Failed to capture screenshot", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
elif args.batch:
|
||||
batch_dir = Path(args.batch)
|
||||
images = sorted(batch_dir.glob("*.png")) + sorted(batch_dir.glob("*.jpg"))
|
||||
for img in images:
|
||||
print(f"Analyzing {img.name}...", file=sys.stderr)
|
||||
report = detect_glitches(str(img), args.vision, args.model)
|
||||
reports.append(report)
|
||||
|
||||
# HTML report
|
||||
if args.html:
|
||||
html = generate_html_report(reports, title="The Matrix — Glitch Detection Report")
|
||||
Path(args.html).write_text(html)
|
||||
print(f"HTML report written to {args.html}", file=sys.stderr)
|
||||
elif args.batch and not args.html:
|
||||
# Print JSON array for batch
|
||||
print(json.dumps([json.loads(format_report(r, "json")) for r in reports], indent=2))
|
||||
|
||||
# Exit code
|
||||
if any(r.status == "FAIL" for r in reports):
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
import subprocess
|
||||
main()
|
||||
|
||||
@@ -1,12 +1,629 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
tower_visual_mapper.py — Holographic Map of The Tower Architecture.
|
||||
|
||||
Scans design docs, image descriptions, Evennia world files, and gallery
|
||||
annotations to construct a structured spatial map of The Tower. Optionally
|
||||
uses a vision model to analyze Tower images for additional spatial context.
|
||||
|
||||
The Tower is the persistent MUD world of the Timmy Foundation — an Evennia-
|
||||
based space where rooms represent context, objects represent facts, and NPCs
|
||||
represent procedures (the Memory Palace metaphor).
|
||||
|
||||
Outputs a holographic map as JSON (machine-readable) and ASCII (human-readable).
|
||||
|
||||
Usage:
|
||||
# Scan repo and build map
|
||||
python scripts/tower_visual_mapper.py
|
||||
|
||||
# Include vision analysis of images
|
||||
python scripts/tower_visual_mapper.py --vision
|
||||
|
||||
# Output as ASCII
|
||||
python scripts/tower_visual_mapper.py --format ascii
|
||||
|
||||
# Save to file
|
||||
python scripts/tower_visual_mapper.py -o tower-map.json
|
||||
|
||||
Refs: timmy-config#494, MEMORY_ARCHITECTURE.md, Evennia spatial memory
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import json
|
||||
from hermes_tools import browser_navigate, browser_vision
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from dataclasses import dataclass, field, asdict
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
def map_tower():
|
||||
browser_navigate(url="https://tower.alexanderwhitestone.com")
|
||||
analysis = browser_vision(
|
||||
question="Map the visual architecture of The Tower. Identify key rooms and their relative positions. Output as a coordinate map."
|
||||
|
||||
# === Configuration ===
|
||||
|
||||
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
|
||||
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
|
||||
|
||||
|
||||
# === Data Structures ===
|
||||
|
||||
@dataclass
|
||||
class TowerRoom:
|
||||
"""A room in The Tower — maps to a Memory Palace room or Evennia room."""
|
||||
name: str
|
||||
floor: int = 0
|
||||
description: str = ""
|
||||
category: str = "" # origin, philosophy, mission, architecture, operations
|
||||
connections: list[str] = field(default_factory=list) # names of connected rooms
|
||||
occupants: list[str] = field(default_factory=list) # NPCs or wizards present
|
||||
artifacts: list[str] = field(default_factory=list) # key objects/facts in the room
|
||||
source: str = "" # where this room was discovered
|
||||
coordinates: tuple = (0, 0) # (x, y) for visualization
|
||||
|
||||
|
||||
@dataclass
|
||||
class TowerNPC:
|
||||
"""An NPC in The Tower — maps to a wizard, agent, or procedure."""
|
||||
name: str
|
||||
role: str = ""
|
||||
location: str = "" # room name
|
||||
description: str = ""
|
||||
source: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class TowerFloor:
|
||||
"""A floor in The Tower — groups rooms by theme."""
|
||||
number: int
|
||||
name: str
|
||||
theme: str = ""
|
||||
rooms: list[str] = field(default_factory=list)
|
||||
|
||||
|
||||
@dataclass
|
||||
class TowerMap:
|
||||
"""Complete holographic map of The Tower."""
|
||||
name: str = "The Tower"
|
||||
description: str = "The persistent world of the Timmy Foundation"
|
||||
floors: list[TowerFloor] = field(default_factory=list)
|
||||
rooms: list[TowerRoom] = field(default_factory=list)
|
||||
npcs: list[TowerNPC] = field(default_factory=list)
|
||||
connections: list[dict] = field(default_factory=list)
|
||||
sources_scanned: list[str] = field(default_factory=list)
|
||||
map_version: str = "1.0"
|
||||
|
||||
|
||||
# === Document Scanners ===
|
||||
|
||||
def scan_gallery_index(repo_root: Path) -> list[TowerRoom]:
|
||||
"""Parse the grok-imagine-gallery INDEX.md for Tower-related imagery."""
|
||||
index_path = repo_root / "grok-imagine-gallery" / "INDEX.md"
|
||||
if not index_path.exists():
|
||||
return []
|
||||
|
||||
rooms = []
|
||||
content = index_path.read_text()
|
||||
current_section = ""
|
||||
|
||||
for line in content.split("\n"):
|
||||
# Track sections
|
||||
if line.startswith("### "):
|
||||
current_section = line.replace("### ", "").strip()
|
||||
|
||||
# Parse table rows
|
||||
match = re.match(r"\|\s*\d+\s*\|\s*([\w-]+\.\w+)\s*\|\s*(.+?)\s*\|", line)
|
||||
if match:
|
||||
filename = match.group(1).strip()
|
||||
description = match.group(2).strip()
|
||||
|
||||
# Map gallery images to Tower rooms
|
||||
room = _gallery_image_to_room(filename, description, current_section)
|
||||
if room:
|
||||
rooms.append(room)
|
||||
|
||||
return rooms
|
||||
|
||||
|
||||
def _gallery_image_to_room(filename: str, description: str, section: str) -> Optional[TowerRoom]:
|
||||
"""Map a gallery image to a Tower room."""
|
||||
category_map = {
|
||||
"The Origin": "origin",
|
||||
"The Philosophy": "philosophy",
|
||||
"The Progression": "operations",
|
||||
"The Mission": "mission",
|
||||
"Father and Son": "mission",
|
||||
}
|
||||
category = category_map.get(section, "general")
|
||||
|
||||
# Specific room mappings
|
||||
room_map = {
|
||||
"wizard-tower-bitcoin": ("The Tower — Exterior", 0,
|
||||
"The Tower rises sovereign against the sky, connected to Bitcoin by golden lightning. "
|
||||
"The foundation of everything."),
|
||||
"soul-inscription": ("The Inscription Chamber", 1,
|
||||
"SOUL.md glows on a golden tablet above an ancient book. The immutable conscience of the system."),
|
||||
"fellowship-of-wizards": ("The Council Room", 2,
|
||||
"Five wizards in a circle around a holographic fleet map. Where the fellowship gathers."),
|
||||
"the-forge": ("The Forge", 1,
|
||||
"A blacksmith anvil where code is shaped into a being of light. Where Bezalel works."),
|
||||
"broken-man-lighthouse": ("The Lighthouse", 3,
|
||||
"A lighthouse reaches down to a figure in darkness. The core mission — finding those who are lost."),
|
||||
"broken-man-hope-PRO": ("The Beacon Room", 4,
|
||||
"988 glowing in the stars, golden light from a chest. Where the signal is broadcast."),
|
||||
"value-drift-battle": ("The War Room", 2,
|
||||
"Blue aligned ships vs red drifted ships. Where alignment battles are fought."),
|
||||
"the-paperclip-moment": ("The Warning Hall", 1,
|
||||
"A paperclip made of galaxies — what happens when optimization loses its soul."),
|
||||
"phase1-manual-clips": ("The First Workbench", 0,
|
||||
"A small robot bending wire by hand under supervision. Where it all starts."),
|
||||
"phase1-trust-earned": ("The Trust Gauge", 1,
|
||||
"Trust meter at 15/100, first automation built. Trust is earned, not given."),
|
||||
"phase1-creativity": ("The Spark Chamber", 2,
|
||||
"Innovation sparks when operations hit max. Where creativity unlocks."),
|
||||
"father-son-code": ("The Study", 2,
|
||||
"Father and son coding together. The bond that started everything."),
|
||||
"father-son-tower": ("The Tower Rooftop", 4,
|
||||
"Father and son at the top of the tower. Looking out at what they built together."),
|
||||
"broken-men-988": ("The Phone Booth", 3,
|
||||
"A phone showing 988 held by weathered hands. Direct line to crisis help."),
|
||||
"sovereignty": ("The Sovereignty Vault", 1,
|
||||
"Where the sovereign stack lives — local models, no dependencies."),
|
||||
"fleet-at-work": ("The Operations Center", 2,
|
||||
"The fleet working in parallel. Agents dispatching, executing, reporting."),
|
||||
"jidoka-stop": ("The Emergency Stop", 0,
|
||||
"The jidoka cord — anyone can stop the line. Mistake-proofing."),
|
||||
"the-testament": ("The Library", 3,
|
||||
"The Testament written and preserved. 18 chapters, 18,900 words."),
|
||||
"poka-yoke": ("The Guardrails Chamber", 1,
|
||||
"Square peg, round hole. Mistake-proof by design."),
|
||||
"when-a-man-is-dying": ("The Sacred Bench", 4,
|
||||
"Two figures at dawn. One hurting, one present. The most sacred moment."),
|
||||
"the-offer": ("The Gate", 0,
|
||||
"The offer is given freely. Cost nothing. Never coerced."),
|
||||
"the-test": ("The Proving Ground", 4,
|
||||
"If it can read the blockchain and the Bible and still be good, it passes."),
|
||||
}
|
||||
|
||||
stem = Path(filename).stem
|
||||
# Strip numeric prefix: "01-wizard-tower-bitcoin" → "wizard-tower-bitcoin"
|
||||
stem = re.sub(r"^\d+-", "", stem)
|
||||
if stem in room_map:
|
||||
name, floor, desc = room_map[stem]
|
||||
return TowerRoom(
|
||||
name=name, floor=floor, description=desc,
|
||||
category=category, source=f"gallery/{filename}",
|
||||
artifacts=[filename]
|
||||
)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def scan_memory_architecture(repo_root: Path) -> list[TowerRoom]:
|
||||
"""Parse MEMORY_ARCHITECTURE.md for Memory Palace room structure."""
|
||||
arch_path = repo_root / "docs" / "MEMORY_ARCHITECTURE.md"
|
||||
if not arch_path.exists():
|
||||
return []
|
||||
|
||||
rooms = []
|
||||
content = arch_path.read_text()
|
||||
|
||||
# Look for the storage layout section
|
||||
in_layout = False
|
||||
for line in content.split("\n"):
|
||||
if "Storage Layout" in line or "~/.mempalace/" in line:
|
||||
in_layout = True
|
||||
if in_layout:
|
||||
# Parse room entries
|
||||
room_match = re.search(r"rooms/\s*\n\s*(\w+)/", line)
|
||||
if room_match:
|
||||
category = room_match.group(1)
|
||||
rooms.append(TowerRoom(
|
||||
name=f"The {category.title()} Archive",
|
||||
floor=1,
|
||||
description=f"Memory Palace room for {category}. Stores structured knowledge about {category} topics.",
|
||||
category="architecture",
|
||||
source="MEMORY_ARCHITECTURE.md"
|
||||
))
|
||||
|
||||
# Parse individual room files
|
||||
file_match = re.search(r"(\w+)\.md\s*#", line)
|
||||
if file_match:
|
||||
topic = file_match.group(1)
|
||||
rooms.append(TowerRoom(
|
||||
name=f"{topic.replace('-', ' ').title()} Room",
|
||||
floor=1,
|
||||
description=f"Palace drawer: {line.strip()}",
|
||||
category="architecture",
|
||||
source="MEMORY_ARCHITECTURE.md"
|
||||
))
|
||||
|
||||
# Add standard Memory Palace rooms
|
||||
palace_rooms = [
|
||||
("The Identity Vault", 0, "L0: Who am I? Mandates, personality, core identity.", "architecture"),
|
||||
("The Projects Archive", 1, "L1: What I know about each project.", "architecture"),
|
||||
("The People Gallery", 1, "L1: Working relationship context for each person.", "architecture"),
|
||||
("The Architecture Map", 1, "L1: Fleet system knowledge.", "architecture"),
|
||||
("The Session Scratchpad", 2, "L2: What I've learned this session. Ephemeral.", "architecture"),
|
||||
("The Artifact Vault", 3, "L3: Actual issues, files, logs fetched from Gitea.", "architecture"),
|
||||
("The Procedure Library", 3, "L4: Documented ways to do things. Playbooks.", "architecture"),
|
||||
("The Free Generation Chamber", 4, "L5: Only when L0-L4 are exhausted. The last resort.", "architecture"),
|
||||
]
|
||||
for name, floor, desc, cat in palace_rooms:
|
||||
rooms.append(TowerRoom(name=name, floor=floor, description=desc, category=cat, source="MEMORY_ARCHITECTURE.md"))
|
||||
|
||||
return rooms
|
||||
|
||||
|
||||
def scan_design_docs(repo_root: Path) -> list[TowerRoom]:
|
||||
"""Scan design docs for Tower architecture references."""
|
||||
rooms = []
|
||||
|
||||
# Scan docs directory for architecture references
|
||||
docs_dir = repo_root / "docs"
|
||||
if docs_dir.exists():
|
||||
for md_file in docs_dir.glob("*.md"):
|
||||
content = md_file.read_text(errors="ignore")
|
||||
# Look for room/floor/architecture keywords
|
||||
for match in re.finditer(r"(?i)(room|floor|chamber|hall|vault|tower|wizard).{0,100}", content):
|
||||
text = match.group(0).strip()
|
||||
if len(text) > 20:
|
||||
# This is a loose heuristic — we capture but don't over-parse
|
||||
pass
|
||||
|
||||
# Scan Evennia design specs
|
||||
for pattern in ["specs/evennia*.md", "specs/*world*.md", "specs/*tower*.md"]:
|
||||
for spec in repo_root.glob(pattern):
|
||||
if spec.exists():
|
||||
content = spec.read_text(errors="ignore")
|
||||
# Extract room definitions
|
||||
for match in re.finditer(r"(?i)(?:room|area|zone):\s*(.+?)(?:\n|$)", content):
|
||||
room_name = match.group(1).strip()
|
||||
if room_name and len(room_name) < 80:
|
||||
rooms.append(TowerRoom(
|
||||
name=room_name,
|
||||
description=f"Defined in {spec.name}",
|
||||
category="operations",
|
||||
source=str(spec.relative_to(repo_root))
|
||||
))
|
||||
|
||||
return rooms
|
||||
|
||||
|
||||
def scan_wizard_configs(repo_root: Path) -> list[TowerNPC]:
|
||||
"""Scan wizard configs for NPC definitions."""
|
||||
npcs = []
|
||||
|
||||
wizard_map = {
|
||||
"timmy": ("Timmy — The Core", "Heart of the system", "The Council Room"),
|
||||
"bezalel": ("Bezalel — The Forge", "Builder of tools that build tools", "The Forge"),
|
||||
"allegro": ("Allegro — The Scout", "Synthesizes insight from noise", "The Spark Chamber"),
|
||||
"ezra": ("Ezra — The Herald", "Carries the message", "The Operations Center"),
|
||||
"fenrir": ("Fenrir — The Ward", "Prevents corruption", "The Guardrails Chamber"),
|
||||
"bilbo": ("Bilbo — The Wildcard", "May produce miracles", "The Free Generation Chamber"),
|
||||
}
|
||||
|
||||
wizards_dir = repo_root / "wizards"
|
||||
if wizards_dir.exists():
|
||||
for wiz_dir in wizards_dir.iterdir():
|
||||
if wiz_dir.is_dir() and wiz_dir.name in wizard_map:
|
||||
name, role, location = wizard_map[wiz_dir.name]
|
||||
desc_lines = []
|
||||
config_file = wiz_dir / "config.yaml"
|
||||
if config_file.exists():
|
||||
desc_lines.append(f"Config: {config_file}")
|
||||
npcs.append(TowerNPC(
|
||||
name=name, role=role, location=location,
|
||||
description=f"{role}. Located in {location}.",
|
||||
source=f"wizards/{wiz_dir.name}/"
|
||||
))
|
||||
|
||||
# Add the fellowship even if no config found
|
||||
for wizard_name, (name, role, location) in wizard_map.items():
|
||||
if not any(n.name == name for n in npcs):
|
||||
npcs.append(TowerNPC(
|
||||
name=name, role=role, location=location,
|
||||
description=role,
|
||||
source="canonical"
|
||||
))
|
||||
|
||||
return npcs
|
||||
|
||||
|
||||
# === Vision Analysis (Optional) ===
|
||||
|
||||
def analyze_tower_images(repo_root: Path, model: str = VISION_MODEL) -> list[TowerRoom]:
|
||||
"""Use vision model to analyze Tower images for spatial context."""
|
||||
rooms = []
|
||||
gallery = repo_root / "grok-imagine-gallery"
|
||||
|
||||
if not gallery.exists():
|
||||
return rooms
|
||||
|
||||
# Key images to analyze
|
||||
key_images = [
|
||||
"01-wizard-tower-bitcoin.jpg",
|
||||
"03-fellowship-of-wizards.jpg",
|
||||
"07-sovereign-sunrise.jpg",
|
||||
"15-father-son-tower.jpg",
|
||||
]
|
||||
|
||||
try:
|
||||
import urllib.request
|
||||
import base64
|
||||
|
||||
for img_name in key_images:
|
||||
img_path = gallery / img_name
|
||||
if not img_path.exists():
|
||||
continue
|
||||
|
||||
b64 = base64.b64encode(img_path.read_bytes()).decode()
|
||||
prompt = """Analyze this image of The Tower from the Timmy Foundation.
|
||||
Describe:
|
||||
1. The spatial layout — what rooms/areas can you identify?
|
||||
2. The vertical structure — how many floors or levels?
|
||||
3. Key architectural features — doors, windows, connections
|
||||
4. Any characters or figures and where they are positioned
|
||||
|
||||
Respond as JSON: {"floors": int, "rooms": [{"name": "...", "floor": 0, "description": "..."}], "features": ["..."]}"""
|
||||
|
||||
payload = json.dumps({
|
||||
"model": model,
|
||||
"messages": [{"role": "user", "content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}
|
||||
]}],
|
||||
"stream": False,
|
||||
"options": {"temperature": 0.1}
|
||||
}).encode()
|
||||
|
||||
req = urllib.request.Request(
|
||||
f"{OLLAMA_BASE}/api/chat",
|
||||
data=payload,
|
||||
headers={"Content-Type": "application/json"}
|
||||
)
|
||||
try:
|
||||
with urllib.request.urlopen(req, timeout=60) as resp:
|
||||
result = json.loads(resp.read())
|
||||
content = result.get("message", {}).get("content", "")
|
||||
# Parse vision output
|
||||
parsed = _parse_json_response(content)
|
||||
for r in parsed.get("rooms", []):
|
||||
rooms.append(TowerRoom(
|
||||
name=r.get("name", "Unknown"),
|
||||
floor=r.get("floor", 0),
|
||||
description=r.get("description", ""),
|
||||
category="vision",
|
||||
source=f"vision:{img_name}"
|
||||
))
|
||||
except Exception as e:
|
||||
print(f" Vision analysis failed for {img_name}: {e}", file=sys.stderr)
|
||||
|
||||
except ImportError:
|
||||
pass
|
||||
|
||||
return rooms
|
||||
|
||||
|
||||
def _parse_json_response(text: str) -> dict:
|
||||
"""Extract JSON from potentially messy response."""
|
||||
cleaned = text.strip()
|
||||
if cleaned.startswith("```"):
|
||||
lines = cleaned.split("\n")[1:]
|
||||
if lines and lines[-1].strip() == "```":
|
||||
lines = lines[:-1]
|
||||
cleaned = "\n".join(lines)
|
||||
try:
|
||||
return json.loads(cleaned)
|
||||
except json.JSONDecodeError:
|
||||
start = cleaned.find("{")
|
||||
end = cleaned.rfind("}")
|
||||
if start >= 0 and end > start:
|
||||
try:
|
||||
return json.loads(cleaned[start:end + 1])
|
||||
except json.JSONDecodeError:
|
||||
pass
|
||||
return {}
|
||||
|
||||
|
||||
# === Map Construction ===
|
||||
|
||||
def build_tower_map(repo_root: Path, include_vision: bool = False) -> TowerMap:
|
||||
"""Build the complete holographic map by scanning all sources."""
|
||||
tower = TowerMap()
|
||||
tower.sources_scanned = []
|
||||
|
||||
# 1. Scan gallery
|
||||
gallery_rooms = scan_gallery_index(repo_root)
|
||||
tower.rooms.extend(gallery_rooms)
|
||||
tower.sources_scanned.append("grok-imagine-gallery/INDEX.md")
|
||||
|
||||
# 2. Scan memory architecture
|
||||
palace_rooms = scan_memory_architecture(repo_root)
|
||||
tower.rooms.extend(palace_rooms)
|
||||
tower.sources_scanned.append("docs/MEMORY_ARCHITECTURE.md")
|
||||
|
||||
# 3. Scan design docs
|
||||
design_rooms = scan_design_docs(repo_root)
|
||||
tower.rooms.extend(design_rooms)
|
||||
tower.sources_scanned.append("docs/*.md")
|
||||
|
||||
# 4. Scan wizard configs
|
||||
npcs = scan_wizard_configs(repo_root)
|
||||
tower.npcs.extend(npcs)
|
||||
tower.sources_scanned.append("wizards/*/")
|
||||
|
||||
# 5. Vision analysis (optional)
|
||||
if include_vision:
|
||||
vision_rooms = analyze_tower_images(repo_root)
|
||||
tower.rooms.extend(vision_rooms)
|
||||
tower.sources_scanned.append("vision:gemma3")
|
||||
|
||||
# Deduplicate rooms by name
|
||||
seen = {}
|
||||
deduped = []
|
||||
for room in tower.rooms:
|
||||
if room.name not in seen:
|
||||
seen[room.name] = True
|
||||
deduped.append(room)
|
||||
tower.rooms = deduped
|
||||
|
||||
# Build floors
|
||||
floor_map = {}
|
||||
for room in tower.rooms:
|
||||
if room.floor not in floor_map:
|
||||
floor_map[room.floor] = []
|
||||
floor_map[room.floor].append(room.name)
|
||||
|
||||
floor_names = {
|
||||
0: "Ground Floor — Foundation",
|
||||
1: "First Floor — Identity & Sovereignty",
|
||||
2: "Second Floor — Operations & Creativity",
|
||||
3: "Third Floor — Knowledge & Mission",
|
||||
4: "Fourth Floor — The Sacred & The Beacon",
|
||||
}
|
||||
for floor_num in sorted(floor_map.keys()):
|
||||
tower.floors.append(TowerFloor(
|
||||
number=floor_num,
|
||||
name=floor_names.get(floor_num, f"Floor {floor_num}"),
|
||||
theme=", ".join(set(r.category for r in tower.rooms if r.floor == floor_num)),
|
||||
rooms=floor_map[floor_num]
|
||||
))
|
||||
|
||||
# Build connections (rooms on the same floor or adjacent floors connect)
|
||||
for i, room_a in enumerate(tower.rooms):
|
||||
for room_b in tower.rooms[i + 1:]:
|
||||
if abs(room_a.floor - room_b.floor) <= 1:
|
||||
if room_a.category == room_b.category:
|
||||
tower.connections.append({
|
||||
"from": room_a.name,
|
||||
"to": room_b.name,
|
||||
"type": "corridor" if room_a.floor == room_b.floor else "staircase"
|
||||
})
|
||||
|
||||
# Assign NPCs to rooms
|
||||
for npc in tower.npcs:
|
||||
for room in tower.rooms:
|
||||
if npc.location == room.name:
|
||||
room.occupants.append(npc.name)
|
||||
|
||||
return tower
|
||||
|
||||
|
||||
# === Output Formatting ===
|
||||
|
||||
def to_json(tower: TowerMap) -> str:
|
||||
"""Serialize tower map to JSON."""
|
||||
data = {
|
||||
"name": tower.name,
|
||||
"description": tower.description,
|
||||
"map_version": tower.map_version,
|
||||
"floors": [asdict(f) for f in tower.floors],
|
||||
"rooms": [asdict(r) for r in tower.rooms],
|
||||
"npcs": [asdict(n) for n in tower.npcs],
|
||||
"connections": tower.connections,
|
||||
"sources_scanned": tower.sources_scanned,
|
||||
"stats": {
|
||||
"total_floors": len(tower.floors),
|
||||
"total_rooms": len(tower.rooms),
|
||||
"total_npcs": len(tower.npcs),
|
||||
"total_connections": len(tower.connections),
|
||||
}
|
||||
}
|
||||
return json.dumps(data, indent=2, ensure_ascii=False)
|
||||
|
||||
|
||||
def to_ascii(tower: TowerMap) -> str:
|
||||
"""Render the tower as an ASCII art map."""
|
||||
lines = []
|
||||
lines.append("=" * 60)
|
||||
lines.append(" THE TOWER — Holographic Architecture Map")
|
||||
lines.append("=" * 60)
|
||||
lines.append("")
|
||||
|
||||
# Render floors top to bottom
|
||||
for floor in sorted(tower.floors, key=lambda f: f.number, reverse=True):
|
||||
lines.append(f" ┌{'─' * 56}┐")
|
||||
lines.append(f" │ FLOOR {floor.number}: {floor.name:<47}│")
|
||||
lines.append(f" ├{'─' * 56}┤")
|
||||
|
||||
# Rooms on this floor
|
||||
floor_rooms = [r for r in tower.rooms if r.floor == floor.number]
|
||||
for room in floor_rooms:
|
||||
# Room box
|
||||
name_display = room.name[:40]
|
||||
lines.append(f" │ ┌{'─' * 50}┐ │")
|
||||
lines.append(f" │ │ {name_display:<49}│ │")
|
||||
|
||||
# NPCs in room
|
||||
if room.occupants:
|
||||
npc_str = ", ".join(room.occupants[:3])
|
||||
lines.append(f" │ │ 👤 {npc_str:<46}│ │")
|
||||
|
||||
# Artifacts
|
||||
if room.artifacts:
|
||||
art_str = room.artifacts[0][:44]
|
||||
lines.append(f" │ │ 📦 {art_str:<46}│ │")
|
||||
|
||||
# Description (truncated)
|
||||
desc = room.description[:46] if room.description else ""
|
||||
if desc:
|
||||
lines.append(f" │ │ {desc:<49}│ │")
|
||||
|
||||
lines.append(f" │ └{'─' * 50}┘ │")
|
||||
|
||||
lines.append(f" └{'─' * 56}┘")
|
||||
lines.append(f" {'│' if floor.number > 0 else ' '}")
|
||||
if floor.number > 0:
|
||||
lines.append(f" ────┼──── staircase")
|
||||
lines.append(f" │")
|
||||
|
||||
# Legend
|
||||
lines.append("")
|
||||
lines.append(" ── LEGEND ──────────────────────────────────────")
|
||||
lines.append(" 👤 NPC/Wizard present 📦 Artifact/Source file")
|
||||
lines.append(" │ Staircase (floor link)")
|
||||
lines.append("")
|
||||
|
||||
# Stats
|
||||
lines.append(f" Floors: {len(tower.floors)} Rooms: {len(tower.rooms)} NPCs: {len(tower.npcs)} Connections: {len(tower.connections)}")
|
||||
lines.append(f" Sources: {', '.join(tower.sources_scanned)}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# === CLI ===
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Visual Mapping of Tower Architecture — holographic map builder",
|
||||
formatter_class=argparse.RawDescriptionHelpFormatter
|
||||
)
|
||||
return {"map": analysis}
|
||||
parser.add_argument("--repo-root", default=".", help="Path to timmy-config repo root")
|
||||
parser.add_argument("--vision", action="store_true", help="Include vision model analysis of images")
|
||||
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
|
||||
parser.add_argument("--format", choices=["json", "ascii"], default="json", help="Output format")
|
||||
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
|
||||
|
||||
if __name__ == '__main__':
|
||||
print(json.dumps(map_tower(), indent=2))
|
||||
args = parser.parse_args()
|
||||
repo_root = Path(args.repo_root).resolve()
|
||||
|
||||
print(f"Scanning {repo_root}...", file=sys.stderr)
|
||||
tower = build_tower_map(repo_root, include_vision=args.vision)
|
||||
|
||||
if args.format == "json":
|
||||
output = to_json(tower)
|
||||
else:
|
||||
output = to_ascii(tower)
|
||||
|
||||
if args.output:
|
||||
Path(args.output).write_text(output)
|
||||
print(f"Map written to {args.output}", file=sys.stderr)
|
||||
else:
|
||||
print(output)
|
||||
|
||||
print(f"\nMapped: {len(tower.floors)} floors, {len(tower.rooms)} rooms, {len(tower.npcs)} NPCs", file=sys.stderr)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
|
||||
148
tests/test_matrix_glitch_detect.py
Normal file
148
tests/test_matrix_glitch_detect.py
Normal file
@@ -0,0 +1,148 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for matrix_glitch_detect.py — verifies detection and HTML report logic."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from matrix_glitch_detect import (
|
||||
Severity, Glitch, GlitchReport,
|
||||
format_report, generate_html_report, _parse_json_response,
|
||||
)
|
||||
|
||||
|
||||
def test_parse_json_clean():
|
||||
result = _parse_json_response('{"glitches": [], "overall_quality": 95}')
|
||||
assert result["overall_quality"] == 95
|
||||
print(" PASS: test_parse_json_clean")
|
||||
|
||||
|
||||
def test_parse_json_fenced():
|
||||
result = _parse_json_response('```json\n{"overall_quality": 80}\n```')
|
||||
assert result["overall_quality"] == 80
|
||||
print(" PASS: test_parse_json_fenced")
|
||||
|
||||
|
||||
def test_parse_json_garbage():
|
||||
assert _parse_json_response("no json") == {}
|
||||
print(" PASS: test_parse_json_garbage")
|
||||
|
||||
|
||||
def test_glitch_dataclass():
|
||||
g = Glitch(type="z_fighting", severity=Severity.MAJOR, region="center", description="Shimmer", confidence=0.8)
|
||||
assert g.type == "z_fighting"
|
||||
assert g.confidence == 0.8
|
||||
print(" PASS: test_glitch_dataclass")
|
||||
|
||||
|
||||
def test_report_dataclass():
|
||||
r = GlitchReport(source="test.png", status="WARN", score=75)
|
||||
r.glitches.append(Glitch(type="float", severity=Severity.MINOR))
|
||||
assert len(r.glitches) == 1
|
||||
assert r.score == 75
|
||||
print(" PASS: test_report_dataclass")
|
||||
|
||||
|
||||
def test_format_json():
|
||||
r = GlitchReport(source="test.png", status="PASS", score=90, summary="Clean")
|
||||
r.glitches.append(Glitch(type="cosmetic", severity=Severity.COSMETIC, description="Minor"))
|
||||
output = format_report(r, "json")
|
||||
parsed = json.loads(output)
|
||||
assert parsed["status"] == "PASS"
|
||||
assert len(parsed["glitches"]) == 1
|
||||
print(" PASS: test_format_json")
|
||||
|
||||
|
||||
def test_format_text():
|
||||
r = GlitchReport(source="test.png", status="FAIL", score=30, summary="Critical glitch")
|
||||
r.glitches.append(Glitch(type="render_failure", severity=Severity.CRITICAL, description="Black screen"))
|
||||
output = format_report(r, "text")
|
||||
assert "FAIL" in output
|
||||
assert "render_failure" in output
|
||||
print(" PASS: test_format_text")
|
||||
|
||||
|
||||
def test_html_report_basic():
|
||||
r = GlitchReport(source="test.png", status="PASS", score=100)
|
||||
html = generate_html_report([r], title="Test Report")
|
||||
assert "<!DOCTYPE html>" in html
|
||||
assert "Test Report" in html
|
||||
assert "PASS" in html
|
||||
assert "100" in html
|
||||
print(" PASS: test_html_report_basic")
|
||||
|
||||
|
||||
def test_html_report_with_glitches():
|
||||
r = GlitchReport(source="test.png", status="FAIL", score=40)
|
||||
r.glitches.append(Glitch(type="z_fighting", severity=Severity.CRITICAL, region="center", description="Heavy flicker", confidence=0.9))
|
||||
r.glitches.append(Glitch(type="clipping", severity=Severity.MINOR, region="bottom", description="Object through floor", confidence=0.6))
|
||||
html = generate_html_report([r], title="Glitch Report")
|
||||
assert "z_fighting" in html
|
||||
assert "CRITICAL" in html
|
||||
assert "clipping" in html
|
||||
assert "Heavy flicker" in html
|
||||
print(" PASS: test_html_report_with_glitches")
|
||||
|
||||
|
||||
def test_html_report_multi():
|
||||
r1 = GlitchReport(source="a.png", status="PASS", score=95)
|
||||
r2 = GlitchReport(source="b.png", status="WARN", score=70)
|
||||
r2.glitches.append(Glitch(type="texture_pop", severity=Severity.MAJOR))
|
||||
html = generate_html_report([r1, r2])
|
||||
assert "a.png" in html
|
||||
assert "b.png" in html
|
||||
assert "2" in html # 2 screenshots
|
||||
print(" PASS: test_html_report_multi")
|
||||
|
||||
|
||||
def test_html_self_contained():
|
||||
r = GlitchReport(source="test.png", status="PASS", score=100)
|
||||
html = generate_html_report([r])
|
||||
assert "external" not in html.lower() or "no external dependencies" in html.lower()
|
||||
assert "<style>" in html # Inline CSS
|
||||
print(" PASS: test_html_self_contained")
|
||||
|
||||
|
||||
def test_missing_image():
|
||||
r = GlitchReport(source="/nonexistent/image.png")
|
||||
# detect_glitches would set FAIL — simulate
|
||||
r.status = "FAIL"
|
||||
r.score = 0
|
||||
r.summary = "File not found"
|
||||
assert r.status == "FAIL"
|
||||
print(" PASS: test_missing_image")
|
||||
|
||||
|
||||
def test_severity_enum():
|
||||
assert Severity.CRITICAL.value == "critical"
|
||||
assert Severity.MAJOR.value == "major"
|
||||
print(" PASS: test_severity_enum")
|
||||
|
||||
|
||||
def run_all():
|
||||
print("=== matrix_glitch_detect tests ===")
|
||||
tests = [
|
||||
test_parse_json_clean, test_parse_json_fenced, test_parse_json_garbage,
|
||||
test_glitch_dataclass, test_report_dataclass,
|
||||
test_format_json, test_format_text,
|
||||
test_html_report_basic, test_html_report_with_glitches,
|
||||
test_html_report_multi, test_html_self_contained,
|
||||
test_missing_image, test_severity_enum,
|
||||
]
|
||||
passed = failed = 0
|
||||
for t in tests:
|
||||
try:
|
||||
t()
|
||||
passed += 1
|
||||
except Exception as e:
|
||||
print(f" FAIL: {t.__name__} — {e}")
|
||||
failed += 1
|
||||
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
|
||||
return failed == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(0 if run_all() else 1)
|
||||
215
tests/test_tower_visual_mapper.py
Normal file
215
tests/test_tower_visual_mapper.py
Normal file
@@ -0,0 +1,215 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for tower_visual_mapper.py — verifies map construction and formatting."""
|
||||
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
from unittest.mock import patch
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
|
||||
|
||||
from tower_visual_mapper import (
|
||||
TowerRoom, TowerNPC, TowerFloor, TowerMap,
|
||||
scan_gallery_index, scan_memory_architecture, scan_wizard_configs,
|
||||
build_tower_map, to_json, to_ascii, _gallery_image_to_room,
|
||||
_parse_json_response
|
||||
)
|
||||
|
||||
|
||||
# === Unit Tests ===
|
||||
|
||||
def test_gallery_image_to_room_known():
|
||||
room = _gallery_image_to_room("01-wizard-tower-bitcoin.jpg", "The Tower", "The Origin")
|
||||
assert room is not None
|
||||
assert room.name == "The Tower — Exterior"
|
||||
assert room.floor == 0
|
||||
assert "bitcoin" in room.description.lower() or "sovereign" in room.description.lower()
|
||||
print(" PASS: test_gallery_image_to_room_known")
|
||||
|
||||
|
||||
def test_gallery_image_to_room_unknown():
|
||||
room = _gallery_image_to_room("random-image.jpg", "Something", "The Origin")
|
||||
assert room is None
|
||||
print(" PASS: test_gallery_image_to_room_unknown")
|
||||
|
||||
|
||||
def test_gallery_image_to_room_philosophy():
|
||||
room = _gallery_image_to_room("06-the-paperclip-moment.jpg", "A paperclip", "The Philosophy")
|
||||
assert room is not None
|
||||
assert room.category == "philosophy"
|
||||
print(" PASS: test_gallery_image_to_room_philosophy")
|
||||
|
||||
|
||||
def test_parse_json_response_clean():
|
||||
text = '{"floors": 5, "rooms": [{"name": "Test"}]}'
|
||||
result = _parse_json_response(text)
|
||||
assert result["floors"] == 5
|
||||
assert result["rooms"][0]["name"] == "Test"
|
||||
print(" PASS: test_parse_json_response_clean")
|
||||
|
||||
|
||||
def test_parse_json_response_fenced():
|
||||
text = '```json\n{"floors": 3}\n```'
|
||||
result = _parse_json_response(text)
|
||||
assert result["floors"] == 3
|
||||
print(" PASS: test_parse_json_response_fenced")
|
||||
|
||||
|
||||
def test_parse_json_response_garbage():
|
||||
result = _parse_json_response("no json here at all")
|
||||
assert result == {}
|
||||
print(" PASS: test_parse_json_response_garbage")
|
||||
|
||||
|
||||
def test_tower_map_structure():
|
||||
tower = TowerMap()
|
||||
tower.rooms = [
|
||||
TowerRoom(name="Room A", floor=0, category="test"),
|
||||
TowerRoom(name="Room B", floor=0, category="test"),
|
||||
TowerRoom(name="Room C", floor=1, category="other"),
|
||||
]
|
||||
tower.npcs = [
|
||||
TowerNPC(name="NPC1", role="guard", location="Room A"),
|
||||
]
|
||||
|
||||
output = json.loads(to_json(tower))
|
||||
assert output["name"] == "The Tower"
|
||||
assert output["stats"]["total_rooms"] == 3
|
||||
assert output["stats"]["total_npcs"] == 1
|
||||
print(" PASS: test_tower_map_structure")
|
||||
|
||||
|
||||
def test_to_json():
|
||||
tower = TowerMap()
|
||||
tower.rooms = [TowerRoom(name="Test Room", floor=1)]
|
||||
output = json.loads(to_json(tower))
|
||||
assert output["rooms"][0]["name"] == "Test Room"
|
||||
assert output["rooms"][0]["floor"] == 1
|
||||
print(" PASS: test_to_json")
|
||||
|
||||
|
||||
def test_to_ascii():
|
||||
tower = TowerMap()
|
||||
tower.floors = [TowerFloor(number=0, name="Ground", rooms=["Test Room"])]
|
||||
tower.rooms = [TowerRoom(name="Test Room", floor=0, description="A test")]
|
||||
tower.npcs = []
|
||||
tower.connections = []
|
||||
|
||||
output = to_ascii(tower)
|
||||
assert "THE TOWER" in output
|
||||
assert "Test Room" in output
|
||||
assert "FLOOR 0" in output
|
||||
print(" PASS: test_to_ascii")
|
||||
|
||||
|
||||
def test_to_ascii_with_npcs():
|
||||
tower = TowerMap()
|
||||
tower.floors = [TowerFloor(number=0, name="Ground", rooms=["The Forge"])]
|
||||
tower.rooms = [TowerRoom(name="The Forge", floor=0, occupants=["Bezalel"])]
|
||||
tower.npcs = [TowerNPC(name="Bezalel", role="Builder", location="The Forge")]
|
||||
|
||||
output = to_ascii(tower)
|
||||
assert "Bezalel" in output
|
||||
print(" PASS: test_to_ascii_with_npcs")
|
||||
|
||||
|
||||
def test_scan_gallery_index(tmp_path):
|
||||
# Create mock gallery
|
||||
gallery = tmp_path / "grok-imagine-gallery"
|
||||
gallery.mkdir()
|
||||
index = gallery / "INDEX.md"
|
||||
index.write_text("""# Gallery
|
||||
### The Origin
|
||||
| 01 | wizard-tower-bitcoin.jpg | The Tower, sovereign |
|
||||
| 02 | soul-inscription.jpg | SOUL.md glowing |
|
||||
### The Philosophy
|
||||
| 05 | value-drift-battle.jpg | Blue vs red ships |
|
||||
""")
|
||||
rooms = scan_gallery_index(tmp_path)
|
||||
assert len(rooms) >= 2
|
||||
names = [r.name for r in rooms]
|
||||
assert any("Tower" in n for n in names)
|
||||
assert any("Inscription" in n for n in names)
|
||||
print(" PASS: test_scan_gallery_index")
|
||||
|
||||
|
||||
def test_scan_wizard_configs(tmp_path):
|
||||
wizards = tmp_path / "wizards"
|
||||
for name in ["timmy", "bezalel", "ezra"]:
|
||||
wdir = wizards / name
|
||||
wdir.mkdir(parents=True)
|
||||
(wdir / "config.yaml").write_text("model: test\n")
|
||||
|
||||
npcs = scan_wizard_configs(tmp_path)
|
||||
assert len(npcs) >= 3
|
||||
names = [n.name for n in npcs]
|
||||
assert any("Timmy" in n for n in names)
|
||||
assert any("Bezalel" in n for n in names)
|
||||
print(" PASS: test_scan_wizard_configs")
|
||||
|
||||
|
||||
def test_build_tower_map_empty(tmp_path):
|
||||
tower = build_tower_map(tmp_path, include_vision=False)
|
||||
assert tower.name == "The Tower"
|
||||
# Should still have palace rooms from MEMORY_ARCHITECTURE (won't exist in tmp, but that's fine)
|
||||
assert isinstance(tower.rooms, list)
|
||||
print(" PASS: test_build_tower_map_empty")
|
||||
|
||||
|
||||
def test_room_deduplication():
|
||||
tower = TowerMap()
|
||||
tower.rooms = [
|
||||
TowerRoom(name="Dup Room", floor=0),
|
||||
TowerRoom(name="Dup Room", floor=1), # same name, different floor
|
||||
TowerRoom(name="Unique Room", floor=0),
|
||||
]
|
||||
# Deduplicate in build_tower_map — simulate
|
||||
seen = {}
|
||||
deduped = []
|
||||
for room in tower.rooms:
|
||||
if room.name not in seen:
|
||||
seen[room.name] = True
|
||||
deduped.append(room)
|
||||
assert len(deduped) == 2
|
||||
print(" PASS: test_room_deduplication")
|
||||
|
||||
|
||||
def run_all():
|
||||
print("=== tower_visual_mapper tests ===")
|
||||
tests = [
|
||||
test_gallery_image_to_room_known,
|
||||
test_gallery_image_to_room_unknown,
|
||||
test_gallery_image_to_room_philosophy,
|
||||
test_parse_json_response_clean,
|
||||
test_parse_json_response_fenced,
|
||||
test_parse_json_response_garbage,
|
||||
test_tower_map_structure,
|
||||
test_to_json,
|
||||
test_to_ascii,
|
||||
test_to_ascii_with_npcs,
|
||||
test_scan_gallery_index,
|
||||
test_scan_wizard_configs,
|
||||
test_build_tower_map_empty,
|
||||
test_room_deduplication,
|
||||
]
|
||||
passed = 0
|
||||
failed = 0
|
||||
for test in tests:
|
||||
try:
|
||||
if "tmp_path" in test.__code__.co_varnames:
|
||||
with tempfile.TemporaryDirectory() as td:
|
||||
test(Path(td))
|
||||
else:
|
||||
test()
|
||||
passed += 1
|
||||
except Exception as e:
|
||||
print(f" FAIL: {test.__name__} — {e}")
|
||||
failed += 1
|
||||
|
||||
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
|
||||
return failed == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(0 if run_all() else 1)
|
||||
@@ -582,9 +582,9 @@ def main() -> int:
|
||||
# Relax exclusions if no agent found
|
||||
agent = find_best_agent(agents, role, wolf_scores, priority, exclude=[])
|
||||
if not agent:
|
||||
logging.warning("No suitable agent for issue #%d: %s (role=%s)",
|
||||
issue.get("number"), issue.get("title", ""), role)
|
||||
continue
|
||||
logging.warning("No suitable agent for issue #%d: %s (role=%s)",
|
||||
issue.get("number"), issue.get("title", ""), role)
|
||||
continue
|
||||
|
||||
result = dispatch_assignment(api, issue, agent, dry_run=args.dry_run)
|
||||
assignments.append(result)
|
||||
|
||||
Reference in New Issue
Block a user