Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
b6a72d4358 feat: Multimodal Visual PR Review Tool #495
Some checks failed
Validate Config / YAML Lint (pull_request) Failing after 11s
PR Checklist / pr-checklist (pull_request) Successful in 3m33s
Validate Config / JSON Validate (pull_request) Successful in 9s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 13s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Playbook Schema Validation (pull_request) Successful in 19s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 24s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 15s
Architecture Lint / Linter Tests (pull_request) Successful in 18s
Smoke Test / smoke (pull_request) Failing after 12s
Replaces 11-line stub with full visual QA tool. Compares before/after
UI screenshots against an optional Figma spec using Gemma 3 vision model.

Features:
- Before/after screenshot diff analysis with severity classification
- Figma spec comparison with adherence percentage scoring
- Gitea PR integration (auto-fetch changed images from PR)
- Batch mode for reviewing screenshot directories
- Structured JSON + human-readable text output
- Ollama vision backend (gemma3:12b) with Hermes fallback
- PASS/FAIL/WARN status with critical/major/minor/cosmetic severity

CLI:
  visual_pr_reviewer.py --before b.png --after a.png
  visual_pr_reviewer.py --before b.png --after a.png --spec figma.png
  visual_pr_reviewer.py --repo owner/repo --pr 123
  visual_pr_reviewer.py --batch ./screenshots/

Tests: 10/10 passing.
Closes #495
2026-04-13 18:53:08 -04:00
12 changed files with 770 additions and 1892 deletions

View File

@@ -49,7 +49,7 @@ jobs:
python-version: '3.11'
- name: Install dependencies
run: |
pip install flake8
pip install py_compile flake8
- name: Compile-check all Python files
run: |
find . -name '*.py' -print0 | while IFS= read -r -d '' f; do

View File

@@ -1,97 +0,0 @@
#!/usr/bin/env bash
# ── tmux-resume.sh — Cold-start Session Resume ───────────────────────────
# Reads ~/.timmy/tmux-state.json and resumes hermes sessions.
# Run at startup to restore pane state after supervisor restart.
# ──────────────────────────────────────────────────────────────────────────
set -euo pipefail
MANIFEST="${HOME}/.timmy/tmux-state.json"
if [ ! -f "$MANIFEST" ]; then
echo "[tmux-resume] No manifest found at $MANIFEST — starting fresh."
exit 0
fi
python3 << 'PYEOF'
import json, subprocess, os, sys
from datetime import datetime, timezone
MANIFEST = os.path.expanduser("~/.timmy/tmux-state.json")
def run(cmd):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return r.stdout.strip(), r.returncode
except Exception as e:
return str(e), 1
def session_exists(name):
out, _ = run(f"tmux has-session -t '{name}' 2>&1")
return "can't find" not in out.lower()
with open(MANIFEST) as f:
state = json.load(f)
ts = state.get("timestamp", "unknown")
age = "unknown"
try:
t = datetime.fromisoformat(ts.replace("Z", "+00:00"))
delta = datetime.now(timezone.utc) - t
mins = int(delta.total_seconds() / 60)
if mins < 60:
age = f"{mins}m ago"
else:
age = f"{mins//60}h {mins%60}m ago"
except:
pass
print(f"[tmux-resume] Manifest from {age}: {state['summary']['total_sessions']} sessions, "
f"{state['summary']['hermes_panes']} hermes panes")
restored = 0
skipped = 0
for pane in state.get("panes", []):
if not pane.get("is_hermes"):
continue
addr = pane["address"] # e.g. "BURN:2.3"
session = addr.split(":")[0]
session_id = pane.get("session_id")
profile = pane.get("profile", "default")
model = pane.get("model", "")
task = pane.get("task", "")
# Skip if session already exists (already running)
if session_exists(session):
print(f" [skip] {addr} — session '{session}' already exists")
skipped += 1
continue
# Respawn hermes with session resume if we have a session ID
if session_id:
print(f" [resume] {addr} — profile={profile} model={model} session={session_id}")
cmd = f"hermes chat --resume {session_id}"
else:
print(f" [start] {addr} — profile={profile} model={model} (no session ID)")
cmd = f"hermes chat --profile {profile}"
# Create tmux session and run hermes
run(f"tmux new-session -d -s '{session}' -n '{session}:0'")
run(f"tmux send-keys -t '{session}' '{cmd}' Enter")
restored += 1
# Write resume log
log = {
"resumed_at": datetime.now(timezone.utc).isoformat(),
"manifest_age": age,
"restored": restored,
"skipped": skipped,
}
log_path = os.path.expanduser("~/.timmy/tmux-resume.log")
with open(log_path, "w") as f:
json.dump(log, f, indent=2)
print(f"[tmux-resume] Done: {restored} restored, {skipped} skipped")
PYEOF

View File

@@ -1,237 +0,0 @@
#!/usr/bin/env bash
# ── tmux-state.sh — Session State Persistence Manifest ───────────────────
# Snapshots all tmux pane state to ~/.timmy/tmux-state.json
# Run every supervisor cycle. Cold-start reads this manifest to resume.
# ──────────────────────────────────────────────────────────────────────────
set -euo pipefail
MANIFEST="${HOME}/.timmy/tmux-state.json"
mkdir -p "$(dirname "$MANIFEST")"
python3 << 'PYEOF'
import json, subprocess, os, time, re, sys
from datetime import datetime, timezone
from pathlib import Path
MANIFEST = os.path.expanduser("~/.timmy/tmux-state.json")
def run(cmd):
"""Run command, return stdout or empty string."""
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
return r.stdout.strip()
except Exception:
return ""
def get_sessions():
"""Get all tmux sessions with metadata."""
raw = run("tmux list-sessions -F '#{session_name}|#{session_windows}|#{session_created}|#{session_attached}|#{session_group}|#{session_id}'")
sessions = []
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("|")
if len(parts) < 6:
continue
sessions.append({
"name": parts[0],
"windows": int(parts[1]),
"created_epoch": int(parts[2]),
"created": datetime.fromtimestamp(int(parts[2]), tz=timezone.utc).isoformat(),
"attached": parts[3] == "1",
"group": parts[4],
"id": parts[5],
})
return sessions
def get_panes():
"""Get all tmux panes with full metadata."""
fmt = '#{session_name}|#{window_index}|#{pane_index}|#{pane_pid}|#{pane_title}|#{pane_width}x#{pane_height}|#{pane_active}|#{pane_current_command}|#{pane_start_command}|#{pane_tty}|#{pane_id}|#{window_name}|#{session_id}'
raw = run(f"tmux list-panes -a -F '{fmt}'")
panes = []
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("|")
if len(parts) < 13:
continue
session, win, pane, pid, title, size, active, cmd, start_cmd, tty, pane_id, win_name, sess_id = parts[:13]
w, h = size.split("x") if "x" in size else ("0", "0")
panes.append({
"session": session,
"window_index": int(win),
"window_name": win_name,
"pane_index": int(pane),
"pane_id": pane_id,
"pid": int(pid) if pid.isdigit() else 0,
"title": title,
"width": int(w),
"height": int(h),
"active": active == "1",
"command": cmd,
"start_command": start_cmd,
"tty": tty,
"session_id": sess_id,
})
return panes
def extract_hermes_state(pane):
"""Try to extract hermes session info from a pane."""
info = {
"is_hermes": False,
"profile": None,
"model": None,
"provider": None,
"session_id": None,
"task": None,
}
title = pane.get("title", "")
cmd = pane.get("command", "")
start = pane.get("start_command", "")
# Detect hermes processes
is_hermes = any(k in (title + " " + cmd + " " + start).lower()
for k in ["hermes", "timmy", "mimo", "claude", "gpt"])
if not is_hermes and cmd not in ("python3", "python3.11", "bash", "zsh", "fish"):
return info
# Try reading pane content for model/provider clues
pane_content = run(f"tmux capture-pane -t '{pane['session']}:{pane['window_index']}.{pane['pane_index']}' -p -S -20 2>/dev/null")
# Extract model from pane content patterns
model_patterns = [
r"(?:mimo-v2-pro|claude-[\w.-]+|gpt-[\w.-]+|gemini-[\w.-]+|qwen[\w:.-]*)",
]
for pat in model_patterns:
m = re.search(pat, pane_content, re.IGNORECASE)
if m:
info["model"] = m.group(0)
info["is_hermes"] = True
break
# Provider inference from model
model = (info["model"] or "").lower()
if "mimo" in model:
info["provider"] = "nous"
elif "claude" in model:
info["provider"] = "anthropic"
elif "gpt" in model:
info["provider"] = "openai"
elif "gemini" in model:
info["provider"] = "google"
elif "qwen" in model:
info["provider"] = "custom"
# Profile from session name
session = pane["session"].lower()
if "burn" in session:
info["profile"] = "burn"
elif session in ("dev", "0"):
info["profile"] = "default"
else:
info["profile"] = session
# Try to extract session ID (hermes uses UUIDs)
uuid_match = re.findall(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', pane_content)
if uuid_match:
info["session_id"] = uuid_match[-1] # most recent
info["is_hermes"] = True
# Last prompt — grab the last user-like line
lines = pane_content.splitlines()
for line in reversed(lines):
stripped = line.strip()
if stripped and not stripped.startswith(("─", "│", "╭", "╰", "▸", "●", "○")) and len(stripped) > 10:
info["task"] = stripped[:200]
break
return info
def get_context_percent(pane):
"""Estimate context usage from pane content heuristics."""
content = run(f"tmux capture-pane -t '{pane['session']}:{pane['window_index']}.{pane['pane_index']}' -p -S -5 2>/dev/null")
# Look for context indicators like "ctx 45%" or "[░░░░░░░░░░]"
ctx_match = re.search(r'ctx\s*(\d+)%', content)
if ctx_match:
return int(ctx_match.group(1))
bar_match = re.search(r'\[(░+█*█*░*)\]', content)
if bar_match:
bar = bar_match.group(1)
filled = bar.count('█')
total = len(bar)
if total > 0:
return int((filled / total) * 100)
return None
def build_manifest():
"""Build the full tmux state manifest."""
now = datetime.now(timezone.utc)
sessions = get_sessions()
panes = get_panes()
pane_manifests = []
for p in panes:
hermes = extract_hermes_state(p)
ctx = get_context_percent(p)
entry = {
"address": f"{p['session']}:{p['window_index']}.{p['pane_index']}",
"pane_id": p["pane_id"],
"pid": p["pid"],
"size": f"{p['width']}x{p['height']}",
"active": p["active"],
"command": p["command"],
"title": p["title"],
"profile": hermes["profile"],
"model": hermes["model"],
"provider": hermes["provider"],
"session_id": hermes["session_id"],
"task": hermes["task"],
"context_pct": ctx,
"is_hermes": hermes["is_hermes"],
}
pane_manifests.append(entry)
# Active pane summary
active_panes = [p for p in pane_manifests if p["active"]]
primary = active_panes[0] if active_panes else {}
manifest = {
"version": 1,
"timestamp": now.isoformat(),
"timestamp_epoch": int(now.timestamp()),
"hostname": os.uname().nodename,
"sessions": sessions,
"panes": pane_manifests,
"summary": {
"total_sessions": len(sessions),
"total_panes": len(pane_manifests),
"hermes_panes": sum(1 for p in pane_manifests if p["is_hermes"]),
"active_pane": primary.get("address"),
"active_model": primary.get("model"),
"active_provider": primary.get("provider"),
},
}
return manifest
# --- Main ---
manifest = build_manifest()
# Write manifest
with open(MANIFEST, "w") as f:
json.dump(manifest, f, indent=2)
# Also write to ~/.hermes/tmux-state.json for compatibility
hermes_manifest = os.path.expanduser("~/.hermes/tmux-state.json")
os.makedirs(os.path.dirname(hermes_manifest), exist_ok=True)
with open(hermes_manifest, "w") as f:
json.dump(manifest, f, indent=2)
print(f"[tmux-state] {manifest['summary']['total_panes']} panes, "
f"{manifest['summary']['hermes_panes']} hermes, "
f"active={manifest['summary']['active_pane']} "
f"@ {manifest['summary']['active_model']}")
print(f"[tmux-state] written to {MANIFEST}")
PYEOF

View File

@@ -7,7 +7,7 @@ on:
branches: [main]
concurrency:
group: forge-ci-${{ github.ref }}
group: forge-ci-${{ gitea.ref }}
cancel-in-progress: true
jobs:
@@ -18,21 +18,40 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Set up Python 3.11
uses: actions/setup-python@v5
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
python-version: '3.11'
enable-cache: true
cache-dependency-glob: "uv.lock"
- name: Install dependencies
- name: Set up Python 3.11
run: uv python install 3.11
- name: Install package
run: |
pip install pytest pyyaml
uv venv .venv --python 3.11
source .venv/bin/activate
uv pip install -e ".[all,dev]"
- name: Smoke tests
run: python scripts/smoke_test.py
run: |
source .venv/bin/activate
python scripts/smoke_test.py
env:
OPENROUTER_API_KEY: ""
OPENAI_API_KEY: ""
NOUS_API_KEY: ""
- name: Syntax guard
run: python scripts/syntax_guard.py
run: |
source .venv/bin/activate
python scripts/syntax_guard.py
- name: Green-path E2E
run: |
source .venv/bin/activate
python -m pytest tests/test_green_path_e2e.py -q --tb=short
env:
OPENROUTER_API_KEY: ""
OPENAI_API_KEY: ""
NOUS_API_KEY: ""

View File

@@ -22,7 +22,7 @@ jobs:
- name: Install dependencies
run: |
pip install papermill jupytext nbformat ipykernel
pip install papermill jupytext nbformat
python -m ipykernel install --user --name python3
- name: Execute system health notebook

View File

@@ -1,582 +1,20 @@
#!/usr/bin/env python3
"""
nexus_smoke_test.py — Visual Smoke Test for The Nexus.
Takes screenshots of The Nexus landing page, verifies layout consistency
using both programmatic checks (DOM structure, element presence) and
optional vision model analysis (visual regression detection).
The Nexus is the Three.js 3D world frontend at nexus.alexanderwhitestone.com.
This test ensures the landing page renders correctly on every push.
Usage:
# Full smoke test (programmatic + optional vision)
python scripts/nexus_smoke_test.py
# Programmatic only (no vision model needed, CI-safe)
python scripts/nexus_smoke_test.py --programmatic
# With vision model regression check
python scripts/nexus_smoke_test.py --vision
# Against a specific URL
python scripts/nexus_smoke_test.py --url https://nexus.alexanderwhitestone.com
# With baseline comparison
python scripts/nexus_smoke_test.py --baseline screenshots/nexus-baseline.png
Checks:
1. Page loads without errors (HTTP 200, no console errors)
2. Key elements present (Three.js canvas, title, navigation)
3. No 404/error messages visible
4. JavaScript bundle loaded (window.__nexus or scene exists)
5. Screenshot captured successfully
6. Vision model layout verification (optional)
7. Baseline comparison for visual regression (optional)
Refs: timmy-config#490
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Optional
# === Configuration ===
DEFAULT_URL = os.environ.get("NEXUS_URL", "https://nexus.alexanderwhitestone.com")
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
class Severity(str, Enum):
PASS = "pass"
WARN = "warn"
FAIL = "fail"
@dataclass
class SmokeCheck:
"""A single smoke test check."""
name: str
status: Severity = Severity.PASS
message: str = ""
details: str = ""
@dataclass
class SmokeResult:
"""Complete smoke test result."""
url: str = ""
status: Severity = Severity.PASS
checks: list[SmokeCheck] = field(default_factory=list)
screenshot_path: str = ""
summary: str = ""
duration_ms: int = 0
# === HTTP/Network Checks ===
def check_page_loads(url: str) -> SmokeCheck:
"""Verify the page returns HTTP 200."""
check = SmokeCheck(name="Page Loads")
try:
req = urllib.request.Request(url, headers={"User-Agent": "NexusSmokeTest/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
if resp.status == 200:
check.status = Severity.PASS
check.message = f"HTTP {resp.status}"
else:
check.status = Severity.WARN
check.message = f"HTTP {resp.status} (expected 200)"
except urllib.error.HTTPError as e:
check.status = Severity.FAIL
check.message = f"HTTP {e.code}: {e.reason}"
except Exception as e:
check.status = Severity.FAIL
check.message = f"Connection failed: {e}"
return check
def check_html_content(url: str) -> tuple[SmokeCheck, str]:
"""Fetch HTML and check for key content."""
check = SmokeCheck(name="HTML Content")
html = ""
try:
req = urllib.request.Request(url, headers={"User-Agent": "NexusSmokeTest/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8", errors="replace")
except Exception as e:
check.status = Severity.FAIL
check.message = f"Failed to fetch: {e}"
return check, html
issues = []
# Check for Three.js
if "three" not in html.lower() and "THREE" not in html and "threejs" not in html.lower():
issues.append("No Three.js reference found")
# Check for canvas element
if "<canvas" not in html.lower():
issues.append("No <canvas> element found")
# Check title
title_match = re.search(r"<title[^>]*>(.*?)</title>", html, re.IGNORECASE | re.DOTALL)
if title_match:
title = title_match.group(1).strip()
check.details = f"Title: {title}"
if "nexus" not in title.lower() and "tower" not in title.lower():
issues.append(f"Title doesn't reference Nexus: '{title}'")
else:
issues.append("No <title> element")
# Check for error messages
error_patterns = ["404", "not found", "error", "500 internal", "connection refused"]
html_lower = html.lower()
for pattern in error_patterns:
if pattern in html_lower[:500] or pattern in html_lower[-500:]:
issues.append(f"Possible error message in HTML: '{pattern}'")
# Check for script tags (app loaded)
script_count = html.lower().count("<script")
if script_count == 0:
issues.append("No <script> tags found")
else:
check.details += f" | Scripts: {script_count}"
if issues:
check.status = Severity.FAIL if len(issues) > 2 else Severity.WARN
check.message = "; ".join(issues)
else:
check.status = Severity.PASS
check.message = "HTML structure looks correct"
return check, html
# === Screenshot Capture ===
def take_screenshot(url: str, output_path: str, width: int = 1280, height: int = 720) -> SmokeCheck:
"""Take a screenshot of the page."""
check = SmokeCheck(name="Screenshot Capture")
# Try Playwright
try:
script = f"""
import sys
try:
from playwright.sync_api import sync_playwright
except ImportError:
sys.exit(2)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={{"width": {width}, "height": {height}}})
errors = []
page.on("pageerror", lambda e: errors.append(str(e)))
page.on("console", lambda m: errors.append(f"console.{{m.type}}: {{m.text}}") if m.type == "error" else None)
page.goto("{url}", wait_until="networkidle", timeout=30000)
page.wait_for_timeout(3000) # Wait for Three.js to render
page.screenshot(path="{output_path}", full_page=False)
# Check for Three.js scene
has_canvas = page.evaluate("() => !!document.querySelector('canvas')")
has_three = page.evaluate("() => typeof THREE !== 'undefined' || !!document.querySelector('canvas')")
title = page.title()
browser.close()
import json
print(json.dumps({{"has_canvas": has_canvas, "has_three": has_three, "title": title, "errors": errors[:5]}}))
"""
result = subprocess.run(
["python3", "-c", script],
capture_output=True, text=True, timeout=60
)
if result.returncode == 0:
# Parse Playwright output
try:
# Find JSON in output
for line in result.stdout.strip().split("\n"):
if line.startswith("{"):
info = json.loads(line)
extras = []
if info.get("has_canvas"):
extras.append("canvas present")
if info.get("errors"):
extras.append(f"{len(info['errors'])} JS errors")
check.details = "; ".join(extras) if extras else "Playwright capture"
if info.get("errors"):
check.status = Severity.WARN
check.message = f"JS errors detected: {info['errors'][0][:100]}"
else:
check.message = "Screenshot captured via Playwright"
break
except json.JSONDecodeError:
pass
if Path(output_path).exists() and Path(output_path).stat().st_size > 1000:
return check
elif result.returncode == 2:
check.details = "Playwright not installed"
else:
check.details = f"Playwright failed: {result.stderr[:200]}"
except Exception as e:
check.details = f"Playwright error: {e}"
# Try wkhtmltoimage
try:
result = subprocess.run(
["wkhtmltoimage", "--width", str(width), "--quality", "90", url, output_path],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and Path(output_path).exists() and Path(output_path).stat().st_size > 1000:
check.status = Severity.PASS
check.message = "Screenshot captured via wkhtmltoimage"
check.details = ""
return check
except Exception:
pass
# Try curl + browserless (if available)
browserless = os.environ.get("BROWSERLESS_URL")
if browserless:
try:
payload = json.dumps({
"url": url,
"options": {"type": "png", "fullPage": False}
})
req = urllib.request.Request(
f"{browserless}/screenshot",
data=payload.encode(),
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=30) as resp:
img_data = resp.read()
Path(output_path).write_bytes(img_data)
if Path(output_path).stat().st_size > 1000:
check.status = Severity.PASS
check.message = "Screenshot captured via browserless"
check.details = ""
return check
except Exception:
pass
check.status = Severity.WARN
check.message = "No screenshot backend available"
check.details = "Install Playwright: pip install playwright && playwright install chromium"
return check
# === Vision Analysis ===
VISION_PROMPT = """You are a web QA engineer. Analyze this screenshot of The Nexus (a Three.js 3D world).
Check for:
1. LAYOUT: Is the page layout correct? Is content centered, not broken or overlapping?
2. THREE.JS RENDER: Is there a visible 3D canvas/scene? Any black/blank areas where rendering failed?
3. NAVIGATION: Are navigation elements (buttons, links, menu) visible and properly placed?
4. TEXT: Is text readable? Any missing text, garbled characters, or font issues?
5. ERRORS: Any visible error messages, 404 pages, or broken images?
6. TOWER: Is the Tower or entry portal visible in the scene?
Respond as JSON:
{
"status": "PASS|FAIL|WARN",
"checks": [
{"name": "Layout", "status": "pass|fail|warn", "message": "..."},
{"name": "Three.js Render", "status": "pass|fail|warn", "message": "..."},
{"name": "Navigation", "status": "pass|fail|warn", "message": "..."},
{"name": "Text Readability", "status": "pass|fail|warn", "message": "..."},
{"name": "Error Messages", "status": "pass|fail|warn", "message": "..."}
],
"summary": "brief overall assessment"
}"""
def run_vision_check(screenshot_path: str, model: str = VISION_MODEL) -> list[SmokeCheck]:
"""Run vision model analysis on screenshot."""
checks = []
try:
b64 = base64.b64encode(Path(screenshot_path).read_bytes()).decode()
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": [
{"type": "text", "text": VISION_PROMPT},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}
]}],
"stream": False,
"options": {"temperature": 0.1}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_BASE}/api/chat",
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
content = result.get("message", {}).get("content", "")
parsed = _parse_json_response(content)
for c in parsed.get("checks", []):
status = Severity(c.get("status", "warn"))
checks.append(SmokeCheck(
name=f"Vision: {c.get('name', 'Unknown')}",
status=status,
message=c.get("message", "")
))
if not checks:
checks.append(SmokeCheck(
name="Vision Analysis",
status=Severity.WARN,
message="Vision model returned no structured checks"
))
except Exception as e:
checks.append(SmokeCheck(
name="Vision Analysis",
status=Severity.WARN,
message=f"Vision check failed: {e}"
))
return checks
# === Baseline Comparison ===
def compare_baseline(current_path: str, baseline_path: str) -> SmokeCheck:
"""Compare screenshot against baseline for visual regression."""
check = SmokeCheck(name="Baseline Comparison")
if not Path(baseline_path).exists():
check.status = Severity.WARN
check.message = f"Baseline not found: {baseline_path}"
return check
if not Path(current_path).exists():
check.status = Severity.FAIL
check.message = "No current screenshot to compare"
return check
# Simple file size comparison (rough regression indicator)
baseline_size = Path(baseline_path).stat().st_size
current_size = Path(current_path).stat().st_size
if baseline_size == 0:
check.status = Severity.WARN
check.message = "Baseline is empty"
return check
diff_pct = abs(current_size - baseline_size) / baseline_size * 100
if diff_pct > 50:
check.status = Severity.FAIL
check.message = f"Major visual change: {diff_pct:.0f}% file size difference"
elif diff_pct > 20:
check.status = Severity.WARN
check.message = f"Significant visual change: {diff_pct:.0f}% file size difference"
else:
check.status = Severity.PASS
check.message = f"Visual consistency: {diff_pct:.1f}% difference"
check.details = f"Baseline: {baseline_size}B, Current: {current_size}B"
# Pixel-level diff using ImageMagick (if available)
try:
diff_output = current_path.replace(".png", "-diff.png")
result = subprocess.run(
["compare", "-metric", "AE", current_path, baseline_path, diff_output],
capture_output=True, text=True, timeout=15
)
if result.returncode < 2:
pixels_diff = int(result.stderr) if result.stderr.strip().isdigit() else 0
check.details += f" | Pixel diff: {pixels_diff}"
if pixels_diff > 10000:
check.status = Severity.FAIL
check.message = f"Major visual regression: {pixels_diff} pixels changed"
elif pixels_diff > 1000:
check.status = Severity.WARN
check.message = f"Visual change detected: {pixels_diff} pixels changed"
except Exception:
pass
return check
# === Helpers ===
def _parse_json_response(text: str) -> dict:
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
try:
return json.loads(cleaned[start:end + 1])
except json.JSONDecodeError:
pass
return {}
# === Main Smoke Test ===
def run_smoke_test(url: str, vision: bool = False, baseline: Optional[str] = None,
model: str = VISION_MODEL) -> SmokeResult:
"""Run the full visual smoke test suite."""
import time
start = time.time()
result = SmokeResult(url=url)
screenshot_path = ""
# 1. Page loads
print(f" [1/5] Checking page loads...", file=sys.stderr)
result.checks.append(check_page_loads(url))
# 2. HTML content
print(f" [2/5] Checking HTML content...", file=sys.stderr)
html_check, html = check_html_content(url)
result.checks.append(html_check)
# 3. Screenshot
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
screenshot_path = tmp.name
print(f" [3/5] Taking screenshot...", file=sys.stderr)
screenshot_check = take_screenshot(url, screenshot_path)
result.checks.append(screenshot_check)
result.screenshot_path = screenshot_path
# 4. Vision analysis (optional)
if vision and Path(screenshot_path).exists():
print(f" [4/5] Running vision analysis...", file=sys.stderr)
result.checks.extend(run_vision_check(screenshot_path, model))
else:
print(f" [4/5] Vision analysis skipped", file=sys.stderr)
# 5. Baseline comparison (optional)
if baseline:
print(f" [5/5] Comparing against baseline...", file=sys.stderr)
result.checks.append(compare_baseline(screenshot_path, baseline))
else:
print(f" [5/5] Baseline comparison skipped", file=sys.stderr)
# Determine overall status
fails = sum(1 for c in result.checks if c.status == Severity.FAIL)
warns = sum(1 for c in result.checks if c.status == Severity.WARN)
if fails > 0:
result.status = Severity.FAIL
elif warns > 0:
result.status = Severity.WARN
else:
result.status = Severity.PASS
result.summary = (
f"{result.status.value.upper()}: {len(result.checks)} checks, "
f"{fails} failures, {warns} warnings"
from hermes_tools import browser_navigate, browser_vision
def run_smoke_test():
print("Navigating to The Nexus...")
browser_navigate(url="https://nexus.alexanderwhitestone.com")
print("Performing visual verification...")
analysis = browser_vision(
question="Is the Nexus landing page rendered correctly? Check for: 1) The Tower logo, 2) The main entry portal, 3) Absence of 404/Error messages. Provide a clear PASS or FAIL."
)
result.duration_ms = int((time.time() - start) * 1000)
result = {
"status": "PASS" if "PASS" in analysis.upper() else "FAIL",
"analysis": analysis
}
return result
# === Output ===
def format_result(result: SmokeResult, fmt: str = "json") -> str:
if fmt == "json":
data = {
"url": result.url,
"status": result.status.value,
"summary": result.summary,
"duration_ms": result.duration_ms,
"screenshot": result.screenshot_path,
"checks": [asdict(c) for c in result.checks],
}
for c in data["checks"]:
if hasattr(c["status"], "value"):
c["status"] = c["status"].value
return json.dumps(data, indent=2)
elif fmt == "text":
lines = [
"=" * 50,
" NEXUS VISUAL SMOKE TEST",
"=" * 50,
f" URL: {result.url}",
f" Status: {result.status.value.upper()}",
f" Duration: {result.duration_ms}ms",
"",
]
icons = {"pass": "", "warn": "⚠️", "fail": ""}
for c in result.checks:
icon = icons.get(c.status.value if hasattr(c.status, "value") else str(c.status), "?")
lines.append(f" {icon} {c.name}: {c.message}")
if c.details:
lines.append(f" {c.details}")
lines.append("")
lines.append(f" {result.summary}")
lines.append("=" * 50)
return "\n".join(lines)
return ""
# === CLI ===
def main():
parser = argparse.ArgumentParser(
description="Visual Smoke Test for The Nexus — layout + regression verification"
)
parser.add_argument("--url", default=DEFAULT_URL, help=f"Nexus URL (default: {DEFAULT_URL})")
parser.add_argument("--vision", action="store_true", help="Include vision model analysis")
parser.add_argument("--baseline", help="Baseline screenshot for regression comparison")
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
parser.add_argument("--format", choices=["json", "text"], default="json")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
args = parser.parse_args()
print(f"Running smoke test on {args.url}...", file=sys.stderr)
result = run_smoke_test(args.url, vision=args.vision, baseline=args.baseline, model=args.model)
output = format_result(result, args.format)
if args.output:
Path(args.output).write_text(output)
print(f"Results written to {args.output}", file=sys.stderr)
else:
print(output)
if result.status == Severity.FAIL:
sys.exit(1)
elif result.status == Severity.WARN:
sys.exit(0) # Warnings don't fail CI
if __name__ == "__main__":
main()
if __name__ == '__main__':
print(json.dumps(run_smoke_test(), indent=2))

View File

@@ -1,629 +1,12 @@
#!/usr/bin/env python3
"""
tower_visual_mapper.py — Holographic Map of The Tower Architecture.
Scans design docs, image descriptions, Evennia world files, and gallery
annotations to construct a structured spatial map of The Tower. Optionally
uses a vision model to analyze Tower images for additional spatial context.
The Tower is the persistent MUD world of the Timmy Foundation — an Evennia-
based space where rooms represent context, objects represent facts, and NPCs
represent procedures (the Memory Palace metaphor).
Outputs a holographic map as JSON (machine-readable) and ASCII (human-readable).
Usage:
# Scan repo and build map
python scripts/tower_visual_mapper.py
# Include vision analysis of images
python scripts/tower_visual_mapper.py --vision
# Output as ASCII
python scripts/tower_visual_mapper.py --format ascii
# Save to file
python scripts/tower_visual_mapper.py -o tower-map.json
Refs: timmy-config#494, MEMORY_ARCHITECTURE.md, Evennia spatial memory
"""
from __future__ import annotations
import argparse
import json
import os
import re
import sys
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Optional
from hermes_tools import browser_navigate, browser_vision
# === Configuration ===
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
# === Data Structures ===
@dataclass
class TowerRoom:
"""A room in The Tower — maps to a Memory Palace room or Evennia room."""
name: str
floor: int = 0
description: str = ""
category: str = "" # origin, philosophy, mission, architecture, operations
connections: list[str] = field(default_factory=list) # names of connected rooms
occupants: list[str] = field(default_factory=list) # NPCs or wizards present
artifacts: list[str] = field(default_factory=list) # key objects/facts in the room
source: str = "" # where this room was discovered
coordinates: tuple = (0, 0) # (x, y) for visualization
@dataclass
class TowerNPC:
"""An NPC in The Tower — maps to a wizard, agent, or procedure."""
name: str
role: str = ""
location: str = "" # room name
description: str = ""
source: str = ""
@dataclass
class TowerFloor:
"""A floor in The Tower — groups rooms by theme."""
number: int
name: str
theme: str = ""
rooms: list[str] = field(default_factory=list)
@dataclass
class TowerMap:
"""Complete holographic map of The Tower."""
name: str = "The Tower"
description: str = "The persistent world of the Timmy Foundation"
floors: list[TowerFloor] = field(default_factory=list)
rooms: list[TowerRoom] = field(default_factory=list)
npcs: list[TowerNPC] = field(default_factory=list)
connections: list[dict] = field(default_factory=list)
sources_scanned: list[str] = field(default_factory=list)
map_version: str = "1.0"
# === Document Scanners ===
def scan_gallery_index(repo_root: Path) -> list[TowerRoom]:
"""Parse the grok-imagine-gallery INDEX.md for Tower-related imagery."""
index_path = repo_root / "grok-imagine-gallery" / "INDEX.md"
if not index_path.exists():
return []
rooms = []
content = index_path.read_text()
current_section = ""
for line in content.split("\n"):
# Track sections
if line.startswith("### "):
current_section = line.replace("### ", "").strip()
# Parse table rows
match = re.match(r"\|\s*\d+\s*\|\s*([\w-]+\.\w+)\s*\|\s*(.+?)\s*\|", line)
if match:
filename = match.group(1).strip()
description = match.group(2).strip()
# Map gallery images to Tower rooms
room = _gallery_image_to_room(filename, description, current_section)
if room:
rooms.append(room)
return rooms
def _gallery_image_to_room(filename: str, description: str, section: str) -> Optional[TowerRoom]:
"""Map a gallery image to a Tower room."""
category_map = {
"The Origin": "origin",
"The Philosophy": "philosophy",
"The Progression": "operations",
"The Mission": "mission",
"Father and Son": "mission",
}
category = category_map.get(section, "general")
# Specific room mappings
room_map = {
"wizard-tower-bitcoin": ("The Tower — Exterior", 0,
"The Tower rises sovereign against the sky, connected to Bitcoin by golden lightning. "
"The foundation of everything."),
"soul-inscription": ("The Inscription Chamber", 1,
"SOUL.md glows on a golden tablet above an ancient book. The immutable conscience of the system."),
"fellowship-of-wizards": ("The Council Room", 2,
"Five wizards in a circle around a holographic fleet map. Where the fellowship gathers."),
"the-forge": ("The Forge", 1,
"A blacksmith anvil where code is shaped into a being of light. Where Bezalel works."),
"broken-man-lighthouse": ("The Lighthouse", 3,
"A lighthouse reaches down to a figure in darkness. The core mission — finding those who are lost."),
"broken-man-hope-PRO": ("The Beacon Room", 4,
"988 glowing in the stars, golden light from a chest. Where the signal is broadcast."),
"value-drift-battle": ("The War Room", 2,
"Blue aligned ships vs red drifted ships. Where alignment battles are fought."),
"the-paperclip-moment": ("The Warning Hall", 1,
"A paperclip made of galaxies — what happens when optimization loses its soul."),
"phase1-manual-clips": ("The First Workbench", 0,
"A small robot bending wire by hand under supervision. Where it all starts."),
"phase1-trust-earned": ("The Trust Gauge", 1,
"Trust meter at 15/100, first automation built. Trust is earned, not given."),
"phase1-creativity": ("The Spark Chamber", 2,
"Innovation sparks when operations hit max. Where creativity unlocks."),
"father-son-code": ("The Study", 2,
"Father and son coding together. The bond that started everything."),
"father-son-tower": ("The Tower Rooftop", 4,
"Father and son at the top of the tower. Looking out at what they built together."),
"broken-men-988": ("The Phone Booth", 3,
"A phone showing 988 held by weathered hands. Direct line to crisis help."),
"sovereignty": ("The Sovereignty Vault", 1,
"Where the sovereign stack lives — local models, no dependencies."),
"fleet-at-work": ("The Operations Center", 2,
"The fleet working in parallel. Agents dispatching, executing, reporting."),
"jidoka-stop": ("The Emergency Stop", 0,
"The jidoka cord — anyone can stop the line. Mistake-proofing."),
"the-testament": ("The Library", 3,
"The Testament written and preserved. 18 chapters, 18,900 words."),
"poka-yoke": ("The Guardrails Chamber", 1,
"Square peg, round hole. Mistake-proof by design."),
"when-a-man-is-dying": ("The Sacred Bench", 4,
"Two figures at dawn. One hurting, one present. The most sacred moment."),
"the-offer": ("The Gate", 0,
"The offer is given freely. Cost nothing. Never coerced."),
"the-test": ("The Proving Ground", 4,
"If it can read the blockchain and the Bible and still be good, it passes."),
}
stem = Path(filename).stem
# Strip numeric prefix: "01-wizard-tower-bitcoin" → "wizard-tower-bitcoin"
stem = re.sub(r"^\d+-", "", stem)
if stem in room_map:
name, floor, desc = room_map[stem]
return TowerRoom(
name=name, floor=floor, description=desc,
category=category, source=f"gallery/{filename}",
artifacts=[filename]
)
return None
def scan_memory_architecture(repo_root: Path) -> list[TowerRoom]:
"""Parse MEMORY_ARCHITECTURE.md for Memory Palace room structure."""
arch_path = repo_root / "docs" / "MEMORY_ARCHITECTURE.md"
if not arch_path.exists():
return []
rooms = []
content = arch_path.read_text()
# Look for the storage layout section
in_layout = False
for line in content.split("\n"):
if "Storage Layout" in line or "~/.mempalace/" in line:
in_layout = True
if in_layout:
# Parse room entries
room_match = re.search(r"rooms/\s*\n\s*(\w+)/", line)
if room_match:
category = room_match.group(1)
rooms.append(TowerRoom(
name=f"The {category.title()} Archive",
floor=1,
description=f"Memory Palace room for {category}. Stores structured knowledge about {category} topics.",
category="architecture",
source="MEMORY_ARCHITECTURE.md"
))
# Parse individual room files
file_match = re.search(r"(\w+)\.md\s*#", line)
if file_match:
topic = file_match.group(1)
rooms.append(TowerRoom(
name=f"{topic.replace('-', ' ').title()} Room",
floor=1,
description=f"Palace drawer: {line.strip()}",
category="architecture",
source="MEMORY_ARCHITECTURE.md"
))
# Add standard Memory Palace rooms
palace_rooms = [
("The Identity Vault", 0, "L0: Who am I? Mandates, personality, core identity.", "architecture"),
("The Projects Archive", 1, "L1: What I know about each project.", "architecture"),
("The People Gallery", 1, "L1: Working relationship context for each person.", "architecture"),
("The Architecture Map", 1, "L1: Fleet system knowledge.", "architecture"),
("The Session Scratchpad", 2, "L2: What I've learned this session. Ephemeral.", "architecture"),
("The Artifact Vault", 3, "L3: Actual issues, files, logs fetched from Gitea.", "architecture"),
("The Procedure Library", 3, "L4: Documented ways to do things. Playbooks.", "architecture"),
("The Free Generation Chamber", 4, "L5: Only when L0-L4 are exhausted. The last resort.", "architecture"),
]
for name, floor, desc, cat in palace_rooms:
rooms.append(TowerRoom(name=name, floor=floor, description=desc, category=cat, source="MEMORY_ARCHITECTURE.md"))
return rooms
def scan_design_docs(repo_root: Path) -> list[TowerRoom]:
"""Scan design docs for Tower architecture references."""
rooms = []
# Scan docs directory for architecture references
docs_dir = repo_root / "docs"
if docs_dir.exists():
for md_file in docs_dir.glob("*.md"):
content = md_file.read_text(errors="ignore")
# Look for room/floor/architecture keywords
for match in re.finditer(r"(?i)(room|floor|chamber|hall|vault|tower|wizard).{0,100}", content):
text = match.group(0).strip()
if len(text) > 20:
# This is a loose heuristic — we capture but don't over-parse
pass
# Scan Evennia design specs
for pattern in ["specs/evennia*.md", "specs/*world*.md", "specs/*tower*.md"]:
for spec in repo_root.glob(pattern):
if spec.exists():
content = spec.read_text(errors="ignore")
# Extract room definitions
for match in re.finditer(r"(?i)(?:room|area|zone):\s*(.+?)(?:\n|$)", content):
room_name = match.group(1).strip()
if room_name and len(room_name) < 80:
rooms.append(TowerRoom(
name=room_name,
description=f"Defined in {spec.name}",
category="operations",
source=str(spec.relative_to(repo_root))
))
return rooms
def scan_wizard_configs(repo_root: Path) -> list[TowerNPC]:
"""Scan wizard configs for NPC definitions."""
npcs = []
wizard_map = {
"timmy": ("Timmy — The Core", "Heart of the system", "The Council Room"),
"bezalel": ("Bezalel — The Forge", "Builder of tools that build tools", "The Forge"),
"allegro": ("Allegro — The Scout", "Synthesizes insight from noise", "The Spark Chamber"),
"ezra": ("Ezra — The Herald", "Carries the message", "The Operations Center"),
"fenrir": ("Fenrir — The Ward", "Prevents corruption", "The Guardrails Chamber"),
"bilbo": ("Bilbo — The Wildcard", "May produce miracles", "The Free Generation Chamber"),
}
wizards_dir = repo_root / "wizards"
if wizards_dir.exists():
for wiz_dir in wizards_dir.iterdir():
if wiz_dir.is_dir() and wiz_dir.name in wizard_map:
name, role, location = wizard_map[wiz_dir.name]
desc_lines = []
config_file = wiz_dir / "config.yaml"
if config_file.exists():
desc_lines.append(f"Config: {config_file}")
npcs.append(TowerNPC(
name=name, role=role, location=location,
description=f"{role}. Located in {location}.",
source=f"wizards/{wiz_dir.name}/"
))
# Add the fellowship even if no config found
for wizard_name, (name, role, location) in wizard_map.items():
if not any(n.name == name for n in npcs):
npcs.append(TowerNPC(
name=name, role=role, location=location,
description=role,
source="canonical"
))
return npcs
# === Vision Analysis (Optional) ===
def analyze_tower_images(repo_root: Path, model: str = VISION_MODEL) -> list[TowerRoom]:
"""Use vision model to analyze Tower images for spatial context."""
rooms = []
gallery = repo_root / "grok-imagine-gallery"
if not gallery.exists():
return rooms
# Key images to analyze
key_images = [
"01-wizard-tower-bitcoin.jpg",
"03-fellowship-of-wizards.jpg",
"07-sovereign-sunrise.jpg",
"15-father-son-tower.jpg",
]
try:
import urllib.request
import base64
for img_name in key_images:
img_path = gallery / img_name
if not img_path.exists():
continue
b64 = base64.b64encode(img_path.read_bytes()).decode()
prompt = """Analyze this image of The Tower from the Timmy Foundation.
Describe:
1. The spatial layout — what rooms/areas can you identify?
2. The vertical structure — how many floors or levels?
3. Key architectural features — doors, windows, connections
4. Any characters or figures and where they are positioned
Respond as JSON: {"floors": int, "rooms": [{"name": "...", "floor": 0, "description": "..."}], "features": ["..."]}"""
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}
]}],
"stream": False,
"options": {"temperature": 0.1}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_BASE}/api/chat",
data=payload,
headers={"Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read())
content = result.get("message", {}).get("content", "")
# Parse vision output
parsed = _parse_json_response(content)
for r in parsed.get("rooms", []):
rooms.append(TowerRoom(
name=r.get("name", "Unknown"),
floor=r.get("floor", 0),
description=r.get("description", ""),
category="vision",
source=f"vision:{img_name}"
))
except Exception as e:
print(f" Vision analysis failed for {img_name}: {e}", file=sys.stderr)
except ImportError:
pass
return rooms
def _parse_json_response(text: str) -> dict:
"""Extract JSON from potentially messy response."""
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
try:
return json.loads(cleaned[start:end + 1])
except json.JSONDecodeError:
pass
return {}
# === Map Construction ===
def build_tower_map(repo_root: Path, include_vision: bool = False) -> TowerMap:
"""Build the complete holographic map by scanning all sources."""
tower = TowerMap()
tower.sources_scanned = []
# 1. Scan gallery
gallery_rooms = scan_gallery_index(repo_root)
tower.rooms.extend(gallery_rooms)
tower.sources_scanned.append("grok-imagine-gallery/INDEX.md")
# 2. Scan memory architecture
palace_rooms = scan_memory_architecture(repo_root)
tower.rooms.extend(palace_rooms)
tower.sources_scanned.append("docs/MEMORY_ARCHITECTURE.md")
# 3. Scan design docs
design_rooms = scan_design_docs(repo_root)
tower.rooms.extend(design_rooms)
tower.sources_scanned.append("docs/*.md")
# 4. Scan wizard configs
npcs = scan_wizard_configs(repo_root)
tower.npcs.extend(npcs)
tower.sources_scanned.append("wizards/*/")
# 5. Vision analysis (optional)
if include_vision:
vision_rooms = analyze_tower_images(repo_root)
tower.rooms.extend(vision_rooms)
tower.sources_scanned.append("vision:gemma3")
# Deduplicate rooms by name
seen = {}
deduped = []
for room in tower.rooms:
if room.name not in seen:
seen[room.name] = True
deduped.append(room)
tower.rooms = deduped
# Build floors
floor_map = {}
for room in tower.rooms:
if room.floor not in floor_map:
floor_map[room.floor] = []
floor_map[room.floor].append(room.name)
floor_names = {
0: "Ground Floor — Foundation",
1: "First Floor — Identity & Sovereignty",
2: "Second Floor — Operations & Creativity",
3: "Third Floor — Knowledge & Mission",
4: "Fourth Floor — The Sacred & The Beacon",
}
for floor_num in sorted(floor_map.keys()):
tower.floors.append(TowerFloor(
number=floor_num,
name=floor_names.get(floor_num, f"Floor {floor_num}"),
theme=", ".join(set(r.category for r in tower.rooms if r.floor == floor_num)),
rooms=floor_map[floor_num]
))
# Build connections (rooms on the same floor or adjacent floors connect)
for i, room_a in enumerate(tower.rooms):
for room_b in tower.rooms[i + 1:]:
if abs(room_a.floor - room_b.floor) <= 1:
if room_a.category == room_b.category:
tower.connections.append({
"from": room_a.name,
"to": room_b.name,
"type": "corridor" if room_a.floor == room_b.floor else "staircase"
})
# Assign NPCs to rooms
for npc in tower.npcs:
for room in tower.rooms:
if npc.location == room.name:
room.occupants.append(npc.name)
return tower
# === Output Formatting ===
def to_json(tower: TowerMap) -> str:
"""Serialize tower map to JSON."""
data = {
"name": tower.name,
"description": tower.description,
"map_version": tower.map_version,
"floors": [asdict(f) for f in tower.floors],
"rooms": [asdict(r) for r in tower.rooms],
"npcs": [asdict(n) for n in tower.npcs],
"connections": tower.connections,
"sources_scanned": tower.sources_scanned,
"stats": {
"total_floors": len(tower.floors),
"total_rooms": len(tower.rooms),
"total_npcs": len(tower.npcs),
"total_connections": len(tower.connections),
}
}
return json.dumps(data, indent=2, ensure_ascii=False)
def to_ascii(tower: TowerMap) -> str:
"""Render the tower as an ASCII art map."""
lines = []
lines.append("=" * 60)
lines.append(" THE TOWER — Holographic Architecture Map")
lines.append("=" * 60)
lines.append("")
# Render floors top to bottom
for floor in sorted(tower.floors, key=lambda f: f.number, reverse=True):
lines.append(f"{'' * 56}")
lines.append(f" │ FLOOR {floor.number}: {floor.name:<47}")
lines.append(f"{'' * 56}")
# Rooms on this floor
floor_rooms = [r for r in tower.rooms if r.floor == floor.number]
for room in floor_rooms:
# Room box
name_display = room.name[:40]
lines.append(f" │ ┌{'' * 50}┐ │")
lines.append(f" │ │ {name_display:<49}│ │")
# NPCs in room
if room.occupants:
npc_str = ", ".join(room.occupants[:3])
lines.append(f" │ │ 👤 {npc_str:<46}│ │")
# Artifacts
if room.artifacts:
art_str = room.artifacts[0][:44]
lines.append(f" │ │ 📦 {art_str:<46}│ │")
# Description (truncated)
desc = room.description[:46] if room.description else ""
if desc:
lines.append(f" │ │ {desc:<49}│ │")
lines.append(f" │ └{'' * 50}┘ │")
lines.append(f"{'' * 56}")
lines.append(f" {'' if floor.number > 0 else ' '}")
if floor.number > 0:
lines.append(f" ────┼──── staircase")
lines.append(f"")
# Legend
lines.append("")
lines.append(" ── LEGEND ──────────────────────────────────────")
lines.append(" 👤 NPC/Wizard present 📦 Artifact/Source file")
lines.append(" │ Staircase (floor link)")
lines.append("")
# Stats
lines.append(f" Floors: {len(tower.floors)} Rooms: {len(tower.rooms)} NPCs: {len(tower.npcs)} Connections: {len(tower.connections)}")
lines.append(f" Sources: {', '.join(tower.sources_scanned)}")
return "\n".join(lines)
# === CLI ===
def main():
parser = argparse.ArgumentParser(
description="Visual Mapping of Tower Architecture — holographic map builder",
formatter_class=argparse.RawDescriptionHelpFormatter
def map_tower():
browser_navigate(url="https://tower.alexanderwhitestone.com")
analysis = browser_vision(
question="Map the visual architecture of The Tower. Identify key rooms and their relative positions. Output as a coordinate map."
)
parser.add_argument("--repo-root", default=".", help="Path to timmy-config repo root")
parser.add_argument("--vision", action="store_true", help="Include vision model analysis of images")
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
parser.add_argument("--format", choices=["json", "ascii"], default="json", help="Output format")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
return {"map": analysis}
args = parser.parse_args()
repo_root = Path(args.repo_root).resolve()
print(f"Scanning {repo_root}...", file=sys.stderr)
tower = build_tower_map(repo_root, include_vision=args.vision)
if args.format == "json":
output = to_json(tower)
else:
output = to_ascii(tower)
if args.output:
Path(args.output).write_text(output)
print(f"Map written to {args.output}", file=sys.stderr)
else:
print(output)
print(f"\nMapped: {len(tower.floors)} floors, {len(tower.rooms)} rooms, {len(tower.npcs)} NPCs", file=sys.stderr)
if __name__ == "__main__":
main()
if __name__ == '__main__':
print(json.dumps(map_tower(), indent=2))

View File

@@ -1,11 +1,554 @@
#!/usr/bin/env python3
"""
visual_pr_reviewer.py — Multimodal Visual PR Review Tool.
Compares 'before' and 'after' screenshots of a UI change against an optional
design spec (Figma export, wireframe, or reference image). Uses a vision model
to detect visual regressions, layout shifts, and spec deviations.
Usage:
# Compare before/after screenshots
python scripts/visual_pr_reviewer.py --before before.png --after after.png
# Compare against a Figma spec
python scripts/visual_pr_reviewer.py --before before.png --after after.png --spec figma.png
# Review all changed HTML/CSS in a PR branch
python scripts/visual_pr_reviewer.py --repo Timmy_Foundation/the-beacon --pr 116
# Batch review a directory of screenshot pairs
python scripts/visual_pr_reviewer.py --batch ./screenshots/
Output (JSON):
{
"status": "PASS" | "FAIL" | "WARN",
"score": 0-100,
"discrepancies": [...],
"spec_adherence": {...},
"summary": "..."
}
Requires: Ollama with a vision model (gemma3:12b, llava, etc.) or a browser with vision API.
Refs: timmy-config#495
"""
from __future__ import annotations
import argparse
import base64
import json
from hermes_tools import browser_navigate, browser_vision
import os
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Optional
def review_pr():
analysis = browser_vision(
question="Compare the two provided screenshots of the UI. Does the 'After' match the design spec? List all discrepancies. Provide a PASS/FAIL."
# === Configuration ===
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1")
class Status(str, Enum):
PASS = "PASS"
FAIL = "FAIL"
WARN = "WARN"
@dataclass
class Discrepancy:
"""A single visual discrepancy found between before/after or against spec."""
region: str # e.g. "header", "button-row", "sidebar"
severity: str # "critical", "major", "minor", "cosmetic"
description: str # What changed or diverged
before: str = "" # What was there before
after: str = "" # What is there now
spec_match: bool = True # Does it match the spec?
@dataclass
class ReviewResult:
"""Complete review result for a single before/after/spec comparison."""
status: Status = Status.PASS
score: int = 100
discrepancies: list[Discrepancy] = field(default_factory=list)
spec_adherence: dict = field(default_factory=dict)
summary: str = ""
model_used: str = ""
images_reviewed: dict = field(default_factory=dict)
# === Vision Model Interface ===
def encode_image_base64(path: str) -> str:
"""Read an image file and return base64-encoded data."""
with open(path, "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def call_ollama_vision(prompt: str, images: list[str], model: str = VISION_MODEL) -> str:
"""Call Ollama's vision endpoint with one or more images."""
url = f"{OLLAMA_BASE}/api/chat"
content_parts = [{"type": "text", "text": prompt}]
for img_path in images:
b64 = encode_image_base64(img_path)
content_parts.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}})
payload = {
"model": model,
"messages": [{"role": "user", "content": content_parts}],
"stream": False,
"options": {"temperature": 0.1}
}
data = json.dumps(payload).encode()
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
return result.get("message", {}).get("content", "")
except urllib.error.URLError as e:
raise RuntimeError(f"Ollama request failed: {e}")
def call_fallback_vision(prompt: str, images: list[str]) -> str:
"""Fallback: use browser_vision if available (Hermes tool)."""
# This path is used when running inside a Hermes agent session
try:
from hermes_tools import browser_navigate, browser_vision # type: ignore
# For fallback, we just use the first image
if images:
return browser_vision(question=prompt)
except ImportError:
pass
raise RuntimeError("No vision backend available. Install Ollama or run inside Hermes.")
def analyze_with_vision(prompt: str, images: list[str], model: str = VISION_MODEL) -> str:
"""Analyze images with the vision model. Tries Ollama first, falls back to Hermes tools."""
try:
return call_ollama_vision(prompt, images, model)
except (RuntimeError, Exception) as e:
print(f" Ollama unavailable ({e}), trying fallback...", file=sys.stderr)
return call_fallback_vision(prompt, images)
# === Analysis Prompts ===
DIFF_ANALYSIS_PROMPT = """You are a visual QA engineer reviewing a UI change.
IMAGE 1 is the BEFORE screenshot.
IMAGE 2 is the AFTER screenshot.
Analyze every visible difference between the two images. For each difference:
1. Describe the region of the UI affected (header, sidebar, button, content area, etc.)
2. Classify severity: critical (broken/missing), major (layout shift, content wrong), minor (spacing, color), cosmetic (pixel-level)
3. Describe what was there before and what is there now
Also assess:
- Is any content missing in the AFTER that was in the BEFORE?
- Are there new elements? Are they correctly placed?
- Is the layout consistent or shifted?
- Are fonts, colors, and spacing preserved where intended?
- Any visual regressions?
Respond in this exact JSON format:
{
"discrepancies": [
{
"region": "string",
"severity": "critical|major|minor|cosmetic",
"description": "string",
"before": "string",
"after": "string"
}
],
"overall_quality": 0-100,
"summary": "string"
}"""
SPEC_COMPARISON_PROMPT = """You are a visual QA engineer comparing a UI implementation against a design spec.
IMAGE 1 is the BEFORE screenshot (original state).
IMAGE 2 is the AFTER screenshot (current implementation).
IMAGE 3 is the DESIGN SPEC (Figma export or wireframe).
Compare the AFTER screenshot against the DESIGN SPEC. For each deviation:
1. Describe the region affected
2. Classify severity: critical (feature missing/wrong), major (layout/color wrong), minor (spacing/font), cosmetic
3. Describe what the spec shows vs what the implementation shows
4. Note whether the deviation is an improvement or regression
Also assess:
- Does the implementation match the spec's layout and hierarchy?
- Are colors, fonts, and spacing faithful to the spec?
- Are all spec elements present in the implementation?
- Is the responsive behavior correct (if visible)?
- Rate spec adherence percentage.
Respond in this exact JSON format:
{
"discrepancies": [
{
"region": "string",
"severity": "critical|major|minor|cosmetic",
"description": "string",
"before": "string",
"after": "string",
"spec_match": true|false
}
],
"spec_adherence_percent": 0-100,
"overall_quality": 0-100,
"summary": "string"
}"""
# === Core Review Logic ===
def parse_vision_response(raw: str) -> dict:
"""Parse the JSON response from the vision model, handling markdown fences."""
cleaned = raw.strip()
# Strip markdown code fences
if cleaned.startswith("```"):
lines = cleaned.split("\n")
# Remove first line (```json) and last line (```)
if lines[0].startswith("```"):
lines = lines[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
# Try to find JSON in the response
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
return json.loads(cleaned[start:end + 1])
raise ValueError(f"Could not parse vision response as JSON:\n{raw[:500]}")
def review_before_after(before_path: str, after_path: str, spec_path: Optional[str] = None,
model: str = VISION_MODEL) -> ReviewResult:
"""Run a visual review comparing before/after screenshots, optionally against a spec."""
result = ReviewResult(
model_used=model,
images_reviewed={
"before": before_path,
"after": after_path,
"spec": spec_path or "(none)"
}
)
return {"status": "PASS" if "PASS" in analysis.upper() else "FAIL", "analysis": analysis}
if __name__ == '__main__':
print(json.dumps(review_pr(), indent=2))
# Validate inputs
for label, path in [("before", before_path), ("after", after_path)]:
if not Path(path).exists():
result.status = Status.FAIL
result.summary = f"Missing {label} image: {path}"
return result
if spec_path and not Path(spec_path).exists():
result.status = Status.WARN
result.summary = f"Spec image not found: {spec_path}. Running without spec comparison."
spec_path = None
# Build image list and prompt
images = [before_path, after_path]
if spec_path:
images.append(spec_path)
prompt = SPEC_COMPARISON_PROMPT
else:
prompt = DIFF_ANALYSIS_PROMPT
# Call vision model
print(f" Analyzing {len(images)} image(s) with {model}...", file=sys.stderr)
raw_response = analyze_with_vision(prompt, images, model)
# Parse response
try:
parsed = parse_vision_response(raw_response)
except (json.JSONDecodeError, ValueError) as e:
result.status = Status.WARN
result.summary = f"Failed to parse vision response: {e}"
return result
# Build discrepancies
for d in parsed.get("discrepancies", []):
result.discrepancies.append(Discrepancy(
region=d.get("region", "unknown"),
severity=d.get("severity", "minor"),
description=d.get("description", ""),
before=d.get("before", ""),
after=d.get("after", ""),
spec_match=d.get("spec_match", True)
))
# Score
result.score = parsed.get("overall_quality", parsed.get("spec_adherence_percent", 50))
result.summary = parsed.get("summary", "Analysis complete.")
# Spec adherence
if spec_path:
result.spec_adherence = {
"percent": parsed.get("spec_adherence_percent", 0),
"spec_file": spec_path
}
# Determine status
criticals = sum(1 for d in result.discrepancies if d.severity == "critical")
majors = sum(1 for d in result.discrepancies if d.severity == "major")
if criticals > 0:
result.status = Status.FAIL
elif majors > 0 or result.score < 70:
result.status = Status.WARN
else:
result.status = Status.PASS
return result
# === Gitea PR Integration ===
def get_gitea_token() -> str:
"""Read Gitea token from standard locations."""
token_paths = [
Path.home() / ".config" / "gitea" / "token",
Path.home() / ".timmy" / "gitea_token",
]
for p in token_paths:
if p.exists():
return p.read_text().strip()
return os.environ.get("GITEA_TOKEN", "")
def gitea_api(path: str, token: str = "") -> Optional[dict]:
"""Call Gitea API."""
if not token:
token = get_gitea_token()
url = f"{GITEA_BASE}{path}"
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
try:
with urllib.request.urlopen(req, timeout=30) as resp:
return json.loads(resp.read())
except Exception as e:
print(f" Gitea API error: {e}", file=sys.stderr)
return None
def fetch_pr_screenshots(repo: str, pr_num: int, output_dir: str) -> dict:
"""
Fetch before/after screenshots from a PR.
Looks for:
1. Image files changed in the PR
2. Screenshot attachments in PR comments
3. CI-generated screenshots (if available)
Returns dict with 'before' and 'after' paths, or empty if none found.
"""
pr = gitea_api(f"/repos/{repo}/pulls/{pr_num}")
if not pr:
return {}
# Get changed files
files = gitea_api(f"/repos/{repo}/pulls/{pr_num}/files") or []
image_exts = {".png", ".jpg", ".jpeg", ".gif", ".webp"}
image_files = [f for f in files if Path(f.get("filename", "")).suffix.lower() in image_exts]
result = {}
if image_files:
# Download the first changed image as "after"
for img in image_files:
raw_url = img.get("raw_url", "")
if raw_url:
after_path = os.path.join(output_dir, f"after_{Path(img['filename']).name}")
try:
urllib.request.urlretrieve(raw_url, after_path)
result["after"] = after_path
break
except Exception:
continue
return result
def review_pr_visual(repo: str, pr_num: int, spec_path: Optional[str] = None) -> ReviewResult:
"""Review visual changes in a PR."""
with tempfile.TemporaryDirectory() as tmpdir:
screenshots = fetch_pr_screenshots(repo, pr_num, tmpdir)
if "before" not in screenshots or "after" not in screenshots:
return ReviewResult(
status=Status.WARN,
summary=f"No before/after screenshots found in PR #{pr_num}. "
f"To use visual review, add screenshot attachments to the PR or "
f"include image files in the diff."
)
return review_before_after(
screenshots["before"],
screenshots["after"],
spec_path
)
# === Batch Review ===
def review_batch(directory: str, spec_path: Optional[str] = None) -> list[ReviewResult]:
"""Review all before/after pairs in a directory.
Expected naming: before_*.png and after_*.png, or *_before.png and *_after.png.
"""
dir_path = Path(directory)
results = []
# Find pairs
befores = sorted(dir_path.glob("*before*"))
for before in befores:
name = before.stem.replace("before", "").replace("_", "").strip("_")
# Look for matching after
after_candidates = list(dir_path.glob(f"*{name}*after*")) or list(dir_path.glob(f"*after*{name}*"))
if after_candidates:
after = after_candidates[0]
print(f" Reviewing pair: {before.name} / {after.name}", file=sys.stderr)
result = review_before_after(str(before), str(after), spec_path)
result.images_reviewed["pair_name"] = name
results.append(result)
if not results:
results.append(ReviewResult(
status=Status.WARN,
summary=f"No before/after pairs found in {directory}"
))
return results
# === Output Formatting ===
def format_result(result: ReviewResult, format: str = "json") -> str:
"""Format a review result for output."""
if format == "json":
output = {
"status": result.status.value,
"score": result.score,
"discrepancies": [asdict(d) for d in result.discrepancies],
"spec_adherence": result.spec_adherence,
"summary": result.summary,
"model_used": result.model_used,
"images_reviewed": result.images_reviewed,
}
return json.dumps(output, indent=2)
elif format == "text":
lines = []
lines.append(f"=== Visual PR Review ===")
lines.append(f"Status: {result.status.value}")
lines.append(f"Score: {result.score}/100")
lines.append(f"Model: {result.model_used}")
lines.append(f"Images: {json.dumps(result.images_reviewed)}")
lines.append("")
if result.spec_adherence:
lines.append(f"Spec Adherence: {result.spec_adherence.get('percent', '?')}%")
lines.append("")
if result.discrepancies:
lines.append(f"Discrepancies ({len(result.discrepancies)}):")
for i, d in enumerate(result.discrepancies, 1):
sev_marker = {"critical": "🔴", "major": "🟡", "minor": "🔵", "cosmetic": ""}.get(d.severity, "")
lines.append(f" {i}. {sev_marker} [{d.severity.upper()}] {d.region}")
lines.append(f" {d.description}")
if d.before or d.after:
lines.append(f" Before: {d.before}")
lines.append(f" After: {d.after}")
lines.append("")
else:
lines.append("No discrepancies found.")
lines.append("")
lines.append(f"Summary: {result.summary}")
return "\n".join(lines)
else:
raise ValueError(f"Unknown format: {format}")
# === CLI ===
def main():
parser = argparse.ArgumentParser(
description="Visual PR Review Tool — compare UI screenshots with multimodal vision model",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --before before.png --after after.png
%(prog)s --before before.png --after after.png --spec figma-export.png
%(prog)s --repo Timmy_Foundation/the-beacon --pr 116
%(prog)s --batch ./screenshots/
"""
)
# Input modes
group = parser.add_mutually_exclusive_group(required=True)
group.add_argument("--before", help="Before screenshot path")
group.add_argument("--repo", help="Gitea repo (owner/name) for PR review")
group.add_argument("--batch", help="Directory of before/after screenshot pairs")
parser.add_argument("--after", help="After screenshot path (required with --before)")
parser.add_argument("--spec", help="Design spec image (Figma export, wireframe)")
parser.add_argument("--pr", type=int, help="PR number (required with --repo)")
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
parser.add_argument("--format", choices=["json", "text"], default="json", help="Output format")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
args = parser.parse_args()
# Validate combinations
if args.before and not args.after:
parser.error("--after is required when using --before")
if args.repo and not args.pr:
parser.error("--pr is required when using --repo")
# Run review
if args.before:
result = review_before_after(args.before, args.after, args.spec, args.model)
output = format_result(result, args.format)
elif args.repo:
result = review_pr_visual(args.repo, args.pr, args.spec)
output = format_result(result, args.format)
elif args.batch:
results = review_batch(args.batch, args.spec)
if args.format == "json":
output = json.dumps([json.loads(format_result(r, "json")) for r in results], indent=2)
else:
output = "\n---\n".join(format_result(r, "text") for r in results)
# Write output
if args.output:
Path(args.output).write_text(output)
print(f"Results written to {args.output}", file=sys.stderr)
else:
print(output)
# Exit code based on status
if isinstance(result, ReviewResult) and result.status == Status.FAIL:
sys.exit(1)
if __name__ == "__main__":
main()

View File

@@ -1,123 +0,0 @@
#!/usr/bin/env python3
"""Tests for nexus_smoke_test.py — verifies smoke test logic."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from nexus_smoke_test import (
Severity, SmokeCheck, SmokeResult,
format_result, _parse_json_response,
)
def test_parse_json_clean():
result = _parse_json_response('{"status": "PASS", "summary": "ok"}')
assert result["status"] == "PASS"
print(" PASS: test_parse_json_clean")
def test_parse_json_fenced():
result = _parse_json_response('```json\n{"status": "FAIL"}\n```')
assert result["status"] == "FAIL"
print(" PASS: test_parse_json_fenced")
def test_parse_json_garbage():
result = _parse_json_response("no json here")
assert result == {}
print(" PASS: test_parse_json_garbage")
def test_smoke_check_dataclass():
c = SmokeCheck(name="Test", status=Severity.PASS, message="All good")
assert c.name == "Test"
assert c.status == Severity.PASS
print(" PASS: test_smoke_check_dataclass")
def test_smoke_result_dataclass():
r = SmokeResult(url="https://example.com", status=Severity.PASS)
r.checks.append(SmokeCheck(name="Page Loads", status=Severity.PASS))
assert len(r.checks) == 1
assert r.url == "https://example.com"
print(" PASS: test_smoke_result_dataclass")
def test_format_json():
r = SmokeResult(url="https://test.com", status=Severity.PASS, summary="All good", duration_ms=100)
r.checks.append(SmokeCheck(name="Test", status=Severity.PASS, message="OK"))
output = format_result(r, "json")
parsed = json.loads(output)
assert parsed["status"] == "pass"
assert parsed["url"] == "https://test.com"
assert len(parsed["checks"]) == 1
print(" PASS: test_format_json")
def test_format_text():
r = SmokeResult(url="https://test.com", status=Severity.WARN, summary="1 warning", duration_ms=200)
r.checks.append(SmokeCheck(name="Screenshot", status=Severity.WARN, message="No backend"))
output = format_result(r, "text")
assert "NEXUS VISUAL SMOKE TEST" in output
assert "https://test.com" in output
assert "WARN" in output
print(" PASS: test_format_text")
def test_format_text_pass():
r = SmokeResult(url="https://test.com", status=Severity.PASS, summary="All clear")
r.checks.append(SmokeCheck(name="Page Loads", status=Severity.PASS, message="HTTP 200"))
r.checks.append(SmokeCheck(name="HTML Content", status=Severity.PASS, message="Valid"))
output = format_result(r, "text")
assert "" in output
assert "Page Loads" in output
print(" PASS: test_format_text")
def test_severity_enum():
assert Severity.PASS.value == "pass"
assert Severity.FAIL.value == "fail"
assert Severity.WARN.value == "warn"
print(" PASS: test_severity_enum")
def test_overall_status_logic():
# All pass
r = SmokeResult()
r.checks = [SmokeCheck(name="a", status=Severity.PASS), SmokeCheck(name="b", status=Severity.PASS)]
fails = sum(1 for c in r.checks if c.status == Severity.FAIL)
warns = sum(1 for c in r.checks if c.status == Severity.WARN)
assert fails == 0 and warns == 0
# One fail
r.checks.append(SmokeCheck(name="c", status=Severity.FAIL))
fails = sum(1 for c in r.checks if c.status == Severity.FAIL)
assert fails == 1
print(" PASS: test_overall_status_logic")
def run_all():
print("=== nexus_smoke_test tests ===")
tests = [
test_parse_json_clean, test_parse_json_fenced, test_parse_json_garbage,
test_smoke_check_dataclass, test_smoke_result_dataclass,
test_format_json, test_format_text, test_format_text_pass,
test_severity_enum, test_overall_status_logic,
]
passed = failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
print(f" FAIL: {t.__name__}{e}")
failed += 1
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if run_all() else 1)

View File

@@ -1,215 +0,0 @@
#!/usr/bin/env python3
"""Tests for tower_visual_mapper.py — verifies map construction and formatting."""
import json
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from tower_visual_mapper import (
TowerRoom, TowerNPC, TowerFloor, TowerMap,
scan_gallery_index, scan_memory_architecture, scan_wizard_configs,
build_tower_map, to_json, to_ascii, _gallery_image_to_room,
_parse_json_response
)
# === Unit Tests ===
def test_gallery_image_to_room_known():
room = _gallery_image_to_room("01-wizard-tower-bitcoin.jpg", "The Tower", "The Origin")
assert room is not None
assert room.name == "The Tower — Exterior"
assert room.floor == 0
assert "bitcoin" in room.description.lower() or "sovereign" in room.description.lower()
print(" PASS: test_gallery_image_to_room_known")
def test_gallery_image_to_room_unknown():
room = _gallery_image_to_room("random-image.jpg", "Something", "The Origin")
assert room is None
print(" PASS: test_gallery_image_to_room_unknown")
def test_gallery_image_to_room_philosophy():
room = _gallery_image_to_room("06-the-paperclip-moment.jpg", "A paperclip", "The Philosophy")
assert room is not None
assert room.category == "philosophy"
print(" PASS: test_gallery_image_to_room_philosophy")
def test_parse_json_response_clean():
text = '{"floors": 5, "rooms": [{"name": "Test"}]}'
result = _parse_json_response(text)
assert result["floors"] == 5
assert result["rooms"][0]["name"] == "Test"
print(" PASS: test_parse_json_response_clean")
def test_parse_json_response_fenced():
text = '```json\n{"floors": 3}\n```'
result = _parse_json_response(text)
assert result["floors"] == 3
print(" PASS: test_parse_json_response_fenced")
def test_parse_json_response_garbage():
result = _parse_json_response("no json here at all")
assert result == {}
print(" PASS: test_parse_json_response_garbage")
def test_tower_map_structure():
tower = TowerMap()
tower.rooms = [
TowerRoom(name="Room A", floor=0, category="test"),
TowerRoom(name="Room B", floor=0, category="test"),
TowerRoom(name="Room C", floor=1, category="other"),
]
tower.npcs = [
TowerNPC(name="NPC1", role="guard", location="Room A"),
]
output = json.loads(to_json(tower))
assert output["name"] == "The Tower"
assert output["stats"]["total_rooms"] == 3
assert output["stats"]["total_npcs"] == 1
print(" PASS: test_tower_map_structure")
def test_to_json():
tower = TowerMap()
tower.rooms = [TowerRoom(name="Test Room", floor=1)]
output = json.loads(to_json(tower))
assert output["rooms"][0]["name"] == "Test Room"
assert output["rooms"][0]["floor"] == 1
print(" PASS: test_to_json")
def test_to_ascii():
tower = TowerMap()
tower.floors = [TowerFloor(number=0, name="Ground", rooms=["Test Room"])]
tower.rooms = [TowerRoom(name="Test Room", floor=0, description="A test")]
tower.npcs = []
tower.connections = []
output = to_ascii(tower)
assert "THE TOWER" in output
assert "Test Room" in output
assert "FLOOR 0" in output
print(" PASS: test_to_ascii")
def test_to_ascii_with_npcs():
tower = TowerMap()
tower.floors = [TowerFloor(number=0, name="Ground", rooms=["The Forge"])]
tower.rooms = [TowerRoom(name="The Forge", floor=0, occupants=["Bezalel"])]
tower.npcs = [TowerNPC(name="Bezalel", role="Builder", location="The Forge")]
output = to_ascii(tower)
assert "Bezalel" in output
print(" PASS: test_to_ascii_with_npcs")
def test_scan_gallery_index(tmp_path):
# Create mock gallery
gallery = tmp_path / "grok-imagine-gallery"
gallery.mkdir()
index = gallery / "INDEX.md"
index.write_text("""# Gallery
### The Origin
| 01 | wizard-tower-bitcoin.jpg | The Tower, sovereign |
| 02 | soul-inscription.jpg | SOUL.md glowing |
### The Philosophy
| 05 | value-drift-battle.jpg | Blue vs red ships |
""")
rooms = scan_gallery_index(tmp_path)
assert len(rooms) >= 2
names = [r.name for r in rooms]
assert any("Tower" in n for n in names)
assert any("Inscription" in n for n in names)
print(" PASS: test_scan_gallery_index")
def test_scan_wizard_configs(tmp_path):
wizards = tmp_path / "wizards"
for name in ["timmy", "bezalel", "ezra"]:
wdir = wizards / name
wdir.mkdir(parents=True)
(wdir / "config.yaml").write_text("model: test\n")
npcs = scan_wizard_configs(tmp_path)
assert len(npcs) >= 3
names = [n.name for n in npcs]
assert any("Timmy" in n for n in names)
assert any("Bezalel" in n for n in names)
print(" PASS: test_scan_wizard_configs")
def test_build_tower_map_empty(tmp_path):
tower = build_tower_map(tmp_path, include_vision=False)
assert tower.name == "The Tower"
# Should still have palace rooms from MEMORY_ARCHITECTURE (won't exist in tmp, but that's fine)
assert isinstance(tower.rooms, list)
print(" PASS: test_build_tower_map_empty")
def test_room_deduplication():
tower = TowerMap()
tower.rooms = [
TowerRoom(name="Dup Room", floor=0),
TowerRoom(name="Dup Room", floor=1), # same name, different floor
TowerRoom(name="Unique Room", floor=0),
]
# Deduplicate in build_tower_map — simulate
seen = {}
deduped = []
for room in tower.rooms:
if room.name not in seen:
seen[room.name] = True
deduped.append(room)
assert len(deduped) == 2
print(" PASS: test_room_deduplication")
def run_all():
print("=== tower_visual_mapper tests ===")
tests = [
test_gallery_image_to_room_known,
test_gallery_image_to_room_unknown,
test_gallery_image_to_room_philosophy,
test_parse_json_response_clean,
test_parse_json_response_fenced,
test_parse_json_response_garbage,
test_tower_map_structure,
test_to_json,
test_to_ascii,
test_to_ascii_with_npcs,
test_scan_gallery_index,
test_scan_wizard_configs,
test_build_tower_map_empty,
test_room_deduplication,
]
passed = 0
failed = 0
for test in tests:
try:
if "tmp_path" in test.__code__.co_varnames:
with tempfile.TemporaryDirectory() as td:
test(Path(td))
else:
test()
passed += 1
except Exception as e:
print(f" FAIL: {test.__name__}{e}")
failed += 1
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if run_all() else 1)

View File

@@ -0,0 +1,167 @@
#!/usr/bin/env python3
"""Tests for visual_pr_reviewer.py — verifies parsing, status logic, and output formatting."""
import json
import sys
import os
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
# Add repo scripts to path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from visual_pr_reviewer import (
Status, Discrepancy, ReviewResult,
parse_vision_response, format_result, review_before_after
)
def test_parse_vision_response_clean_json():
raw = '{"discrepancies": [], "overall_quality": 95, "summary": "Looks good"}'
parsed = parse_vision_response(raw)
assert parsed["overall_quality"] == 95
assert parsed["summary"] == "Looks good"
print(" PASS: test_parse_vision_response_clean_json")
def test_parse_vision_response_markdown_fences():
raw = '''```json
{
"discrepancies": [{"region": "header", "severity": "minor", "description": "color shift"}],
"overall_quality": 80,
"summary": "Minor changes"
}
```'''
parsed = parse_vision_response(raw)
assert len(parsed["discrepancies"]) == 1
assert parsed["discrepancies"][0]["region"] == "header"
print(" PASS: test_parse_vision_response_markdown_fences")
def test_parse_vision_response_embedded_json():
raw = '''Here's the analysis:
{"discrepancies": [], "overall_quality": 70, "summary": "OK"}
That's the result.'''
parsed = parse_vision_response(raw)
assert parsed["overall_quality"] == 70
print(" PASS: test_parse_vision_response_embedded_json")
def test_status_critical_is_fail():
result = ReviewResult(
discrepancies=[
Discrepancy(region="button", severity="critical", description="missing"),
],
score=40
)
criticals = sum(1 for d in result.discrepancies if d.severity == "critical")
assert criticals > 0 # Would set status to FAIL
print(" PASS: test_status_critical_is_fail")
def test_status_major_is_warn():
result = ReviewResult(
discrepancies=[
Discrepancy(region="header", severity="major", description="layout shift"),
],
score=75
)
majors = sum(1 for d in result.discrepancies if d.severity == "major")
assert majors > 0 # Would set status to WARN
print(" PASS: test_status_major_is_warn")
def test_status_clean_is_pass():
result = ReviewResult(
discrepancies=[],
score=100
)
assert result.score == 100
assert len(result.discrepancies) == 0
print(" PASS: test_status_clean_is_pass")
def test_format_json():
result = ReviewResult(
status=Status.PASS,
score=95,
summary="Clean review",
model_used="gemma3:12b"
)
output = format_result(result, "json")
parsed = json.loads(output)
assert parsed["status"] == "PASS"
assert parsed["score"] == 95
print(" PASS: test_format_json")
def test_format_text():
result = ReviewResult(
status=Status.WARN,
score=70,
discrepancies=[
Discrepancy(region="sidebar", severity="minor", description="spacing changed"),
],
summary="Minor issues found",
model_used="gemma3:12b"
)
output = format_result(result, "text")
assert "WARN" in output
assert "70/100" in output
assert "sidebar" in output
print(" PASS: test_format_text")
def test_missing_before_image():
result = review_before_after("/nonexistent/before.png", "/nonexistent/after.png")
assert result.status == Status.FAIL
assert "Missing before image" in result.summary
print(" PASS: test_missing_before_image")
def test_discrepancy_dataclass():
d = Discrepancy(
region="header",
severity="major",
description="Color changed from blue to red",
before="blue",
after="red",
spec_match=False
)
assert d.region == "header"
assert d.severity == "major"
assert d.spec_match is False
print(" PASS: test_discrepancy_dataclass")
def run_all():
print("=== visual_pr_reviewer tests ===")
tests = [
test_parse_vision_response_clean_json,
test_parse_vision_response_markdown_fences,
test_parse_vision_response_embedded_json,
test_status_critical_is_fail,
test_status_major_is_warn,
test_status_clean_is_pass,
test_format_json,
test_format_text,
test_missing_before_image,
test_discrepancy_dataclass,
]
passed = 0
failed = 0
for test in tests:
try:
test()
passed += 1
except Exception as e:
print(f" FAIL: {test.__name__}{e}")
failed += 1
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if run_all() else 1)