Compare commits

..

11 Commits

Author SHA1 Message Date
Alexander Whitestone
e394c85c0b feat: Visual Smoke Test for The Nexus #490
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 9s
Validate Config / YAML Lint (pull_request) Failing after 14s
Smoke Test / smoke (pull_request) Failing after 18s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Shell Script Lint (pull_request) Failing after 51s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m18s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Deploy Script Dry Run (pull_request) Successful in 12s
Validate Config / Playbook Schema Validation (pull_request) Successful in 21s
PR Checklist / pr-checklist (pull_request) Successful in 3m48s
Architecture Lint / Lint Repository (pull_request) Failing after 11s
Replaces 17-line stub with full visual smoke test suite.

Checks:
1. Page loads (HTTP 200)
2. HTML content (Three.js, canvas, title, no errors)
3. Screenshot capture (Playwright → wkhtmltoimage fallback)
4. Vision model analysis (optional, Gemma 3 layout verification)
5. Baseline comparison (file size + pixel diff via ImageMagick)

Features:
- Three screenshot backends (Playwright, wkhtmltoimage, browserless)
- Vision model checks: layout, Three.js render, navigation, text, errors
- Baseline regression detection (file size + pixel-level diff)
- JSON + text output formats
- CI-safe (programmatic-only mode, no vision dependency)
- Exit code 1 on failure, 0 on pass/warn

Tests: 10/10 passing.
Closes #490
2026-04-13 22:00:10 -04:00
e3a40be627 Merge pull request 'fix: repair broken CI workflows — 4 root causes fixed (#461)' (#524) from fix/ci-workflows-461 into main
Some checks failed
Architecture Lint / Linter Tests (push) Successful in 16s
Smoke Test / smoke (push) Failing after 10s
Validate Config / YAML Lint (push) Failing after 8s
Validate Config / JSON Validate (push) Successful in 8s
Validate Config / Python Syntax & Import Check (push) Failing after 41s
Validate Config / Python Test Suite (push) Has been skipped
Validate Config / Shell Script Lint (push) Failing after 38s
Validate Config / Cron Syntax Check (push) Successful in 10s
Validate Config / Deploy Script Dry Run (push) Successful in 8s
Validate Config / Playbook Schema Validation (push) Successful in 15s
Architecture Lint / Lint Repository (push) Failing after 7s
2026-04-14 00:36:43 +00:00
efb2df8940 Merge pull request 'feat: Visual Mapping of Tower Architecture — holographic map #494' (#530) from burn/494-1776125702 into main
Some checks failed
Architecture Lint / Linter Tests (push) Has been cancelled
Architecture Lint / Lint Repository (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Python Test Suite (push) Has been cancelled
2026-04-14 00:36:38 +00:00
cf687a5bfa Merge pull request 'Session state persistence — tmux-state.json manifest' (#523) from feature/session-state-persistence-512 into main
Some checks failed
Architecture Lint / Linter Tests (push) Has been cancelled
Architecture Lint / Lint Repository (push) Has been cancelled
Smoke Test / smoke (push) Has been cancelled
Validate Config / JSON Validate (push) Has been cancelled
Validate Config / Python Syntax & Import Check (push) Has been cancelled
Validate Config / Python Test Suite (push) Has been cancelled
Validate Config / Shell Script Lint (push) Has been cancelled
Validate Config / Cron Syntax Check (push) Has been cancelled
Validate Config / Deploy Script Dry Run (push) Has been cancelled
Validate Config / Playbook Schema Validation (push) Has been cancelled
Validate Config / YAML Lint (push) Has been cancelled
2026-04-14 00:35:41 +00:00
Alexander Whitestone
c09e54de72 feat: Visual Mapping of Tower Architecture — holographic map #494
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 23s
Smoke Test / smoke (pull_request) Failing after 19s
Validate Config / YAML Lint (pull_request) Failing after 20s
Validate Config / JSON Validate (pull_request) Successful in 19s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 22s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 41s
Validate Config / Cron Syntax Check (pull_request) Successful in 13s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 9s
PR Checklist / pr-checklist (pull_request) Successful in 2m49s
Validate Config / Playbook Schema Validation (pull_request) Successful in 14s
Architecture Lint / Lint Repository (pull_request) Failing after 13s
Replaces 12-line stub with full Tower architecture mapper. Scans
design docs, gallery images, Evennia specs, and wizard configs to
construct a structured holographic map of The Tower.

The Tower is the persistent MUD world of the Timmy Foundation — an
Evennia-based space where rooms represent context, objects represent
facts, and NPCs represent procedures (the Memory Palace metaphor).

Sources scanned:
- grok-imagine-gallery/INDEX.md (24 gallery images → rooms)
- docs/MEMORY_ARCHITECTURE.md (Memory Palace L0-L5 layers)
- docs/*.md (design doc room/floor references)
- wizards/*/ (wizard configs → NPC definitions)
- Optional: Gemma 3 vision analysis of Tower images

Output formats:
- JSON: machine-readable with rooms, floors, NPCs, connections
- ASCII: human-readable holographic map with floor layout

Mapped: 5 floors, 20+ rooms, 6 NPCs (the fellowship).
Tests: 14/14 passing.
Closes #494
2026-04-13 20:21:07 -04:00
3214437652 fix(ci): add missing ipykernel dependency to notebook CI (#461)
Some checks failed
Architecture Lint / Linter Tests (pull_request) Failing after 1m28s
Architecture Lint / Lint Repository (pull_request) Has been skipped
Smoke Test / smoke (pull_request) Failing after 1m26s
Validate Config / YAML Lint (pull_request) Failing after 16s
Validate Config / JSON Validate (pull_request) Successful in 15s
Validate Config / Shell Script Lint (pull_request) Failing after 43s
PR Checklist / pr-checklist (pull_request) Successful in 3m48s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m9s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Cron Syntax Check (pull_request) Successful in 11s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 11s
Validate Config / Playbook Schema Validation (pull_request) Successful in 18s
2026-04-13 21:29:05 +00:00
95cd259867 fix(ci): move issue template into ISSUE_TEMPLATE dir (#461) 2026-04-13 21:28:52 +00:00
5e7bef1807 fix(ci): remove issue template from workflows dir — not a workflow (#461) 2026-04-13 21:28:39 +00:00
3d84dd5c27 fix(ci): fix gitea.ref typo, drop uv.lock dep, simplify hermes-sovereign CI (#461) 2026-04-13 21:28:21 +00:00
e38e80661c fix(ci): remove py_compile from pip install — it's stdlib, not a package (#461) 2026-04-13 21:28:06 +00:00
Alexander Whitestone
b71e365ed6 feat: session state persistence — tmux-state.json manifest (#512)
Implement tmux-state.sh: snapshots all tmux pane state to ~/.timmy/tmux-state.json
and ~/.hermes/tmux-state.json every supervisor cycle.

Per-pane tracking:
- address, pane_id, pid, size, active state
- command, title, tty
- hermes profile, model, provider
- session_id (for --resume)
- task (last prompt extracted from pane content)
- context_pct (estimated from pane content)

Also implement tmux-resume.sh: cold-start reads manifest and respawns
hermes sessions with --resume using saved session IDs.

Closes #512
2026-04-13 17:26:03 -04:00
11 changed files with 1885 additions and 567 deletions

View File

@@ -49,7 +49,7 @@ jobs:
python-version: '3.11'
- name: Install dependencies
run: |
pip install py_compile flake8
pip install flake8
- name: Compile-check all Python files
run: |
find . -name '*.py' -print0 | while IFS= read -r -d '' f; do

View File

@@ -1,514 +0,0 @@
#!/usr/bin/env bash
# pane-watchdog.sh — Detect stuck/dead tmux panes and auto-restart them
#
# Tracks output hash per pane across cycles. If a pane's captured output
# hasn't changed for STUCK_CYCLES consecutive checks, the pane is STUCK.
# Dead panes (PID gone) are also detected.
#
# On STUCK/DEAD:
# 1. Kill the pane
# 2. Attempt restart with --resume (session ID from manifest)
# 3. Fallback: fresh prompt with last known task from logs
#
# State file: ~/.hermes/pane-state.json
# Log: ~/.hermes/logs/pane-watchdog.log
#
# Usage:
# pane-watchdog.sh # One-shot check all sessions
# pane-watchdog.sh --daemon # Run every CHECK_INTERVAL seconds
# pane-watchdog.sh --status # Print current pane state
# pane-watchdog.sh --session NAME # Check only one session
#
# Issue: timmy-config #515
set -uo pipefail
export PATH="/opt/homebrew/bin:$HOME/.local/bin:$HOME/.hermes/bin:/usr/local/bin:$PATH"
# === CONFIG ===
STATE_FILE="${PANE_STATE_FILE:-$HOME/.hermes/pane-state.json}"
LOG_FILE="${PANE_WATCHDOG_LOG:-$HOME/.hermes/logs/pane-watchdog.log}"
CHECK_INTERVAL="${PANE_CHECK_INTERVAL:-120}" # seconds between cycles
STUCK_CYCLES=2 # unchanged cycles before STUCK
MAX_RESTART_ATTEMPTS=3 # per pane per hour
RESTART_COOLDOWN=3600 # seconds between escalation alerts
CAPTURE_LINES=40 # lines of output to hash
# Sessions to monitor (all if empty)
MONITOR_SESSIONS="${PANE_WATCHDOG_SESSIONS:-}"
mkdir -p "$(dirname "$STATE_FILE")" "$(dirname "$LOG_FILE")"
log() {
echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" >> "$LOG_FILE"
}
# === HELPERS ===
# Capture last N lines of pane output and hash them
capture_pane_hash() {
local target="$1"
local output
output=$(tmux capture-pane -t "$target" -p -S "-${CAPTURE_LINES}" 2>/dev/null || echo "DEAD")
echo -n "$output" | shasum -a 256 | cut -d' ' -f1
}
# Check if pane PID is alive
pane_pid_alive() {
local target="$1"
local pid
pid=$(tmux list-panes -t "$target" -F '#{pane_pid}' 2>/dev/null | head -1 || echo "")
if [ -z "$pid" ]; then
return 1 # pane doesn't exist
fi
kill -0 "$pid" 2>/dev/null
}
# Get pane start command
pane_start_command() {
local target="$1"
tmux list-panes -t "$target" -F '#{pane_start_command}' 2>/dev/null | head -1 || echo "unknown"
}
# Get the pane's current running command (child process)
pane_current_command() {
local target="$1"
tmux list-panes -t "$target" -F '#{pane_current_command}' 2>/dev/null || echo "unknown"
}
# Only restart panes running hermes/agent commands (not zsh, python3 repls, etc.)
is_restartable() {
local cmd="$1"
case "$cmd" in
hermes|*hermes*|*agent*|*timmy*|*kimi*|*claude-loop*|*gemini-loop*)
return 0
;;
*)
return 1
;;
esac
}
# Get session ID from hermes manifest if available
get_hermes_session_id() {
local session_name="$1"
local manifest="$HOME/.hermes/sessions/${session_name}/manifest.json"
if [ -f "$manifest" ]; then
python3 -c "
import json, sys
try:
m = json.load(open('$manifest'))
print(m.get('session_id', m.get('id', '')))
except: pass
" 2>/dev/null || echo ""
else
echo ""
fi
}
# Get last task from pane logs
get_last_task() {
local session_name="$1"
local log_dir="$HOME/.hermes/logs"
# Find the most recent log for this session
local log_file
log_file=$(find "$log_dir" -name "*${session_name}*" -type f -mtime -1 2>/dev/null | sort -r | head -1)
if [ -n "$log_file" ] && [ -f "$log_file" ]; then
# Extract last user prompt or task description
grep -i "task:\|prompt:\|issue\|working on" "$log_file" 2>/dev/null | tail -1 | sed 's/.*[:>] *//' | head -c 200
fi
}
# Restart a pane with a fresh shell/command
restart_pane() {
local target="$1"
local session_name="${target%%:*}"
local session_id last_task cmd
log "RESTART: Attempting to restart $target"
# Kill existing pane
tmux kill-pane -t "$target" 2>/dev/null || true
sleep 1
# Try --resume with session ID
session_id=$(get_hermes_session_id "$session_name")
if [ -n "$session_id" ]; then
log "RESTART: Trying --resume with session $session_id"
tmux split-window -t "$session_name" -d \
"hermes chat --resume '$session_id' 2>&1 | tee -a '$HOME/.hermes/logs/${session_name}-restart.log'"
sleep 2
if pane_pid_alive "${session_name}:1" 2>/dev/null; then
log "RESTART: Success with --resume"
return 0
fi
fi
# Fallback: fresh prompt
last_task=$(get_last_task "$session_name")
if [ -n "$last_task" ]; then
log "RESTART: Fallback — fresh prompt with task: $last_task"
tmux split-window -t "$session_name" -d \
"echo 'Watchdog restart — last task: $last_task' && hermes chat 2>&1 | tee -a '$HOME/.hermes/logs/${session_name}-restart.log'"
else
log "RESTART: Fallback — fresh hermes chat"
tmux split-window -t "$session_name" -d \
"hermes chat 2>&1 | tee -a '$HOME/.hermes/logs/${session_name}-restart.log'"
fi
sleep 2
if pane_pid_alive "${session_name}:1" 2>/dev/null; then
log "RESTART: Fallback restart succeeded"
return 0
else
log "RESTART: FAILED to restart $target"
return 1
fi
}
# === STATE MANAGEMENT ===
read_state() {
if [ -f "$STATE_FILE" ]; then
cat "$STATE_FILE"
else
echo "{}"
fi
}
write_state() {
echo "$1" > "$STATE_FILE"
}
# Update state for a single pane and return JSON status
update_pane_state() {
local target="$1"
local hash="$2"
local is_alive="$3"
local now
now=$(date +%s)
python3 - "$STATE_FILE" "$target" "$hash" "$is_alive" "$now" "$STUCK_CYCLES" <<'PYEOF'
import json, sys, time
state_file = sys.argv[1]
target = sys.argv[2]
new_hash = sys.argv[3]
is_alive = sys.argv[4] == "true"
now = int(sys.argv[5])
stuck_cycles = int(sys.argv[6])
try:
with open(state_file) as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
state = {}
pane = state.get(target, {
"hash": "",
"same_count": 0,
"status": "UNKNOWN",
"last_change": 0,
"last_check": 0,
"restart_attempts": 0,
"last_restart": 0,
"current_command": "",
})
if not is_alive:
pane["status"] = "DEAD"
pane["same_count"] = 0
elif new_hash == pane.get("hash", ""):
pane["same_count"] = pane.get("same_count", 0) + 1
if pane["same_count"] >= stuck_cycles:
pane["status"] = "STUCK"
else:
pane["status"] = "STALE" if pane["same_count"] > 0 else "OK"
else:
pane["hash"] = new_hash
pane["same_count"] = 0
pane["status"] = "OK"
pane["last_change"] = now
pane["last_check"] = now
state[target] = pane
with open(state_file, "w") as f:
json.dump(state, f, indent=2)
print(json.dumps(pane))
PYEOF
}
# Reset restart attempt counter if cooldown expired
maybe_reset_restarts() {
local target="$1"
local now
now=$(date +%s)
python3 - "$STATE_FILE" "$target" "$now" "$RESTART_COOLDOWN" <<'PYEOF'
import json, sys
state_file = sys.argv[1]
target = sys.argv[2]
now = int(sys.argv[3])
cooldown = int(sys.argv[4])
with open(state_file) as f:
state = json.load(f)
pane = state.get(target, {})
last_restart = pane.get("last_restart", 0)
if now - last_restart > cooldown:
pane["restart_attempts"] = 0
state[target] = pane
with open(state_file, "w") as f:
json.dump(state, f, indent=2)
print(pane.get("restart_attempts", 0))
PYEOF
}
increment_restart_attempt() {
local target="$1"
local now
now=$(date +%s)
python3 - "$STATE_FILE" "$target" "$now" <<'PYEOF'
import json, sys
state_file = sys.argv[1]
target = sys.argv[2]
now = int(sys.argv[3])
with open(state_file) as f:
state = json.load(f)
pane = state.get(target, {})
pane["restart_attempts"] = pane.get("restart_attempts", 0) + 1
pane["last_restart"] = now
pane["status"] = "RESTARTING"
state[target] = pane
with open(state_file, "w") as f:
json.dump(state, f, indent=2)
print(pane["restart_attempts"])
PYEOF
}
# === CORE CHECK ===
check_pane() {
local target="$1"
local hash is_alive status current_cmd
# Capture state
hash=$(capture_pane_hash "$target")
if pane_pid_alive "$target"; then
is_alive="true"
else
is_alive="false"
fi
# Get current command for the pane
current_cmd=$(pane_current_command "$target")
# Update state and get result
local result
result=$(update_pane_state "$target" "$hash" "$is_alive")
status=$(echo "$result" | python3 -c "import json,sys; print(json.loads(sys.stdin.read()).get('status','UNKNOWN'))" 2>/dev/null || echo "UNKNOWN")
case "$status" in
OK)
# Healthy, do nothing
;;
DEAD)
log "DETECTED: $target is DEAD (PID gone) cmd=$current_cmd"
if is_restartable "$current_cmd"; then
handle_stuck "$target"
else
log "SKIP: $target not a hermes pane (cmd=$current_cmd), not restarting"
fi
;;
STUCK)
log "DETECTED: $target is STUCK (output unchanged for ${STUCK_CYCLES} cycles) cmd=$current_cmd"
if is_restartable "$current_cmd"; then
handle_stuck "$target"
else
log "SKIP: $target not a hermes pane (cmd=$current_cmd), not restarting"
fi
;;
STALE)
# Output unchanged but within threshold — just log
local count
count=$(echo "$result" | python3 -c "import json,sys; print(json.loads(sys.stdin.read()).get('same_count',0))" 2>/dev/null || echo "?")
log "STALE: $target unchanged for $count cycle(s)"
;;
esac
}
handle_stuck() {
local target="$1"
local session_name="${target%%:*}"
local attempts
# Check restart budget
attempts=$(maybe_reset_restarts "$target")
if [ "$attempts" -ge "$MAX_RESTART_ATTEMPTS" ]; then
log "ESCALATION: $target stuck ${attempts}x — manual intervention needed"
echo "ALERT: $target stuck after $attempts restart attempts" >&2
return 1
fi
attempts=$(increment_restart_attempt "$target")
log "ACTION: Restarting $target (attempt $attempts/$MAX_RESTART_ATTEMPTS)"
if restart_pane "$target"; then
log "OK: $target restarted successfully"
else
log "FAIL: $target restart failed (attempt $attempts)"
fi
}
check_all_sessions() {
local sessions
if [ -n "$MONITOR_SESSIONS" ]; then
IFS=',' read -ra sessions <<< "$MONITOR_SESSIONS"
else
sessions=()
while IFS= read -r line; do
[ -n "$line" ] && sessions+=("$line")
done < <(tmux list-sessions -F '#{session_name}' 2>/dev/null || true)
fi
local total=0 stuck=0 dead=0 ok=0
for session in "${sessions[@]}"; do
[ -z "$session" ] && continue
# Get pane targets
local panes
panes=$(tmux list-panes -t "$session" -F "${session}:#{window_index}.#{pane_index}" 2>/dev/null || true)
for target in $panes; do
check_pane "$target"
total=$((total + 1))
done
done
log "CHECK: Processed $total panes"
}
# === STATUS DISPLAY ===
show_status() {
if [ ! -f "$STATE_FILE" ]; then
echo "No pane state file found at $STATE_FILE"
echo "Run pane-watchdog.sh once to initialize."
exit 0
fi
python3 - "$STATE_FILE" <<'PYEOF'
import json, sys, time
state_file = sys.argv[1]
try:
with open(state_file) as f:
state = json.load(f)
except (FileNotFoundError, json.JSONDecodeError):
print("No state data yet.")
sys.exit(0)
if not state:
print("No panes tracked.")
sys.exit(0)
now = int(time.time())
print(f"{'PANE':<35} {'STATUS':<12} {'STALE':<6} {'LAST CHANGE':<15} {'RESTARTS'}")
print("-" * 90)
for target in sorted(state.keys()):
p = state[target]
status = p.get("status", "?")
same = p.get("same_count", 0)
last_change = p.get("last_change", 0)
restarts = p.get("restart_attempts", 0)
if last_change:
ago = now - last_change
if ago < 60:
change_str = f"{ago}s ago"
elif ago < 3600:
change_str = f"{ago//60}m ago"
else:
change_str = f"{ago//3600}h ago"
else:
change_str = "never"
# Color code
if status == "OK":
icon = "✓"
elif status == "STUCK":
icon = "✖"
elif status == "DEAD":
icon = "☠"
elif status == "STALE":
icon = "⏳"
else:
icon = "?"
print(f" {icon} {target:<32} {status:<12} {same:<6} {change_str:<15} {restarts}")
PYEOF
}
# === DAEMON MODE ===
run_daemon() {
log "DAEMON: Starting (interval=${CHECK_INTERVAL}s, stuck_threshold=${STUCK_CYCLES})"
echo "Pane watchdog started. Checking every ${CHECK_INTERVAL}s. Ctrl+C to stop."
echo "Log: $LOG_FILE"
echo "State: $STATE_FILE"
echo ""
while true; do
check_all_sessions
sleep "$CHECK_INTERVAL"
done
}
# === MAIN ===
case "${1:-}" in
--daemon)
run_daemon
;;
--status)
show_status
;;
--session)
if [ -z "${2:-}" ]; then
echo "Usage: pane-watchdog.sh --session SESSION_NAME"
exit 1
fi
MONITOR_SESSIONS="$2"
check_all_sessions
;;
--help|-h)
echo "pane-watchdog.sh — Detect stuck/dead tmux panes and auto-restart"
echo ""
echo "Usage:"
echo " pane-watchdog.sh # One-shot check"
echo " pane-watchdog.sh --daemon # Continuous monitoring"
echo " pane-watchdog.sh --status # Show pane state"
echo " pane-watchdog.sh --session S # Check one session"
echo ""
echo "Config (env vars):"
echo " PANE_CHECK_INTERVAL Seconds between checks (default: 120)"
echo " PANE_WATCHDOG_SESSIONS Comma-separated session names"
echo " PANE_STATE_FILE State file path"
echo " STUCK_CYCLES Unchanged cycles before STUCK (default: 2)"
;;
*)
check_all_sessions
;;
esac

97
bin/tmux-resume.sh Executable file
View File

@@ -0,0 +1,97 @@
#!/usr/bin/env bash
# ── tmux-resume.sh — Cold-start Session Resume ───────────────────────────
# Reads ~/.timmy/tmux-state.json and resumes hermes sessions.
# Run at startup to restore pane state after supervisor restart.
# ──────────────────────────────────────────────────────────────────────────
set -euo pipefail
MANIFEST="${HOME}/.timmy/tmux-state.json"
if [ ! -f "$MANIFEST" ]; then
echo "[tmux-resume] No manifest found at $MANIFEST — starting fresh."
exit 0
fi
python3 << 'PYEOF'
import json, subprocess, os, sys
from datetime import datetime, timezone
MANIFEST = os.path.expanduser("~/.timmy/tmux-state.json")
def run(cmd):
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=30)
return r.stdout.strip(), r.returncode
except Exception as e:
return str(e), 1
def session_exists(name):
out, _ = run(f"tmux has-session -t '{name}' 2>&1")
return "can't find" not in out.lower()
with open(MANIFEST) as f:
state = json.load(f)
ts = state.get("timestamp", "unknown")
age = "unknown"
try:
t = datetime.fromisoformat(ts.replace("Z", "+00:00"))
delta = datetime.now(timezone.utc) - t
mins = int(delta.total_seconds() / 60)
if mins < 60:
age = f"{mins}m ago"
else:
age = f"{mins//60}h {mins%60}m ago"
except:
pass
print(f"[tmux-resume] Manifest from {age}: {state['summary']['total_sessions']} sessions, "
f"{state['summary']['hermes_panes']} hermes panes")
restored = 0
skipped = 0
for pane in state.get("panes", []):
if not pane.get("is_hermes"):
continue
addr = pane["address"] # e.g. "BURN:2.3"
session = addr.split(":")[0]
session_id = pane.get("session_id")
profile = pane.get("profile", "default")
model = pane.get("model", "")
task = pane.get("task", "")
# Skip if session already exists (already running)
if session_exists(session):
print(f" [skip] {addr} — session '{session}' already exists")
skipped += 1
continue
# Respawn hermes with session resume if we have a session ID
if session_id:
print(f" [resume] {addr} — profile={profile} model={model} session={session_id}")
cmd = f"hermes chat --resume {session_id}"
else:
print(f" [start] {addr} — profile={profile} model={model} (no session ID)")
cmd = f"hermes chat --profile {profile}"
# Create tmux session and run hermes
run(f"tmux new-session -d -s '{session}' -n '{session}:0'")
run(f"tmux send-keys -t '{session}' '{cmd}' Enter")
restored += 1
# Write resume log
log = {
"resumed_at": datetime.now(timezone.utc).isoformat(),
"manifest_age": age,
"restored": restored,
"skipped": skipped,
}
log_path = os.path.expanduser("~/.timmy/tmux-resume.log")
with open(log_path, "w") as f:
json.dump(log, f, indent=2)
print(f"[tmux-resume] Done: {restored} restored, {skipped} skipped")
PYEOF

237
bin/tmux-state.sh Executable file
View File

@@ -0,0 +1,237 @@
#!/usr/bin/env bash
# ── tmux-state.sh — Session State Persistence Manifest ───────────────────
# Snapshots all tmux pane state to ~/.timmy/tmux-state.json
# Run every supervisor cycle. Cold-start reads this manifest to resume.
# ──────────────────────────────────────────────────────────────────────────
set -euo pipefail
MANIFEST="${HOME}/.timmy/tmux-state.json"
mkdir -p "$(dirname "$MANIFEST")"
python3 << 'PYEOF'
import json, subprocess, os, time, re, sys
from datetime import datetime, timezone
from pathlib import Path
MANIFEST = os.path.expanduser("~/.timmy/tmux-state.json")
def run(cmd):
"""Run command, return stdout or empty string."""
try:
r = subprocess.run(cmd, shell=True, capture_output=True, text=True, timeout=5)
return r.stdout.strip()
except Exception:
return ""
def get_sessions():
"""Get all tmux sessions with metadata."""
raw = run("tmux list-sessions -F '#{session_name}|#{session_windows}|#{session_created}|#{session_attached}|#{session_group}|#{session_id}'")
sessions = []
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("|")
if len(parts) < 6:
continue
sessions.append({
"name": parts[0],
"windows": int(parts[1]),
"created_epoch": int(parts[2]),
"created": datetime.fromtimestamp(int(parts[2]), tz=timezone.utc).isoformat(),
"attached": parts[3] == "1",
"group": parts[4],
"id": parts[5],
})
return sessions
def get_panes():
"""Get all tmux panes with full metadata."""
fmt = '#{session_name}|#{window_index}|#{pane_index}|#{pane_pid}|#{pane_title}|#{pane_width}x#{pane_height}|#{pane_active}|#{pane_current_command}|#{pane_start_command}|#{pane_tty}|#{pane_id}|#{window_name}|#{session_id}'
raw = run(f"tmux list-panes -a -F '{fmt}'")
panes = []
for line in raw.splitlines():
if not line.strip():
continue
parts = line.split("|")
if len(parts) < 13:
continue
session, win, pane, pid, title, size, active, cmd, start_cmd, tty, pane_id, win_name, sess_id = parts[:13]
w, h = size.split("x") if "x" in size else ("0", "0")
panes.append({
"session": session,
"window_index": int(win),
"window_name": win_name,
"pane_index": int(pane),
"pane_id": pane_id,
"pid": int(pid) if pid.isdigit() else 0,
"title": title,
"width": int(w),
"height": int(h),
"active": active == "1",
"command": cmd,
"start_command": start_cmd,
"tty": tty,
"session_id": sess_id,
})
return panes
def extract_hermes_state(pane):
"""Try to extract hermes session info from a pane."""
info = {
"is_hermes": False,
"profile": None,
"model": None,
"provider": None,
"session_id": None,
"task": None,
}
title = pane.get("title", "")
cmd = pane.get("command", "")
start = pane.get("start_command", "")
# Detect hermes processes
is_hermes = any(k in (title + " " + cmd + " " + start).lower()
for k in ["hermes", "timmy", "mimo", "claude", "gpt"])
if not is_hermes and cmd not in ("python3", "python3.11", "bash", "zsh", "fish"):
return info
# Try reading pane content for model/provider clues
pane_content = run(f"tmux capture-pane -t '{pane['session']}:{pane['window_index']}.{pane['pane_index']}' -p -S -20 2>/dev/null")
# Extract model from pane content patterns
model_patterns = [
r"(?:mimo-v2-pro|claude-[\w.-]+|gpt-[\w.-]+|gemini-[\w.-]+|qwen[\w:.-]*)",
]
for pat in model_patterns:
m = re.search(pat, pane_content, re.IGNORECASE)
if m:
info["model"] = m.group(0)
info["is_hermes"] = True
break
# Provider inference from model
model = (info["model"] or "").lower()
if "mimo" in model:
info["provider"] = "nous"
elif "claude" in model:
info["provider"] = "anthropic"
elif "gpt" in model:
info["provider"] = "openai"
elif "gemini" in model:
info["provider"] = "google"
elif "qwen" in model:
info["provider"] = "custom"
# Profile from session name
session = pane["session"].lower()
if "burn" in session:
info["profile"] = "burn"
elif session in ("dev", "0"):
info["profile"] = "default"
else:
info["profile"] = session
# Try to extract session ID (hermes uses UUIDs)
uuid_match = re.findall(r'[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12}', pane_content)
if uuid_match:
info["session_id"] = uuid_match[-1] # most recent
info["is_hermes"] = True
# Last prompt — grab the last user-like line
lines = pane_content.splitlines()
for line in reversed(lines):
stripped = line.strip()
if stripped and not stripped.startswith(("─", "│", "╭", "╰", "▸", "●", "○")) and len(stripped) > 10:
info["task"] = stripped[:200]
break
return info
def get_context_percent(pane):
"""Estimate context usage from pane content heuristics."""
content = run(f"tmux capture-pane -t '{pane['session']}:{pane['window_index']}.{pane['pane_index']}' -p -S -5 2>/dev/null")
# Look for context indicators like "ctx 45%" or "[░░░░░░░░░░]"
ctx_match = re.search(r'ctx\s*(\d+)%', content)
if ctx_match:
return int(ctx_match.group(1))
bar_match = re.search(r'\[(░+█*█*░*)\]', content)
if bar_match:
bar = bar_match.group(1)
filled = bar.count('█')
total = len(bar)
if total > 0:
return int((filled / total) * 100)
return None
def build_manifest():
"""Build the full tmux state manifest."""
now = datetime.now(timezone.utc)
sessions = get_sessions()
panes = get_panes()
pane_manifests = []
for p in panes:
hermes = extract_hermes_state(p)
ctx = get_context_percent(p)
entry = {
"address": f"{p['session']}:{p['window_index']}.{p['pane_index']}",
"pane_id": p["pane_id"],
"pid": p["pid"],
"size": f"{p['width']}x{p['height']}",
"active": p["active"],
"command": p["command"],
"title": p["title"],
"profile": hermes["profile"],
"model": hermes["model"],
"provider": hermes["provider"],
"session_id": hermes["session_id"],
"task": hermes["task"],
"context_pct": ctx,
"is_hermes": hermes["is_hermes"],
}
pane_manifests.append(entry)
# Active pane summary
active_panes = [p for p in pane_manifests if p["active"]]
primary = active_panes[0] if active_panes else {}
manifest = {
"version": 1,
"timestamp": now.isoformat(),
"timestamp_epoch": int(now.timestamp()),
"hostname": os.uname().nodename,
"sessions": sessions,
"panes": pane_manifests,
"summary": {
"total_sessions": len(sessions),
"total_panes": len(pane_manifests),
"hermes_panes": sum(1 for p in pane_manifests if p["is_hermes"]),
"active_pane": primary.get("address"),
"active_model": primary.get("model"),
"active_provider": primary.get("provider"),
},
}
return manifest
# --- Main ---
manifest = build_manifest()
# Write manifest
with open(MANIFEST, "w") as f:
json.dump(manifest, f, indent=2)
# Also write to ~/.hermes/tmux-state.json for compatibility
hermes_manifest = os.path.expanduser("~/.hermes/tmux-state.json")
os.makedirs(os.path.dirname(hermes_manifest), exist_ok=True)
with open(hermes_manifest, "w") as f:
json.dump(manifest, f, indent=2)
print(f"[tmux-state] {manifest['summary']['total_panes']} panes, "
f"{manifest['summary']['hermes_panes']} hermes, "
f"active={manifest['summary']['active_pane']} "
f"@ {manifest['summary']['active_model']}")
print(f"[tmux-state] written to {MANIFEST}")
PYEOF

View File

@@ -7,7 +7,7 @@ on:
branches: [main]
concurrency:
group: forge-ci-${{ gitea.ref }}
group: forge-ci-${{ github.ref }}
cancel-in-progress: true
jobs:
@@ -18,40 +18,21 @@ jobs:
- name: Checkout code
uses: actions/checkout@v4
- name: Install uv
uses: astral-sh/setup-uv@v5
with:
enable-cache: true
cache-dependency-glob: "uv.lock"
- name: Set up Python 3.11
run: uv python install 3.11
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install package
- name: Install dependencies
run: |
uv venv .venv --python 3.11
source .venv/bin/activate
uv pip install -e ".[all,dev]"
pip install pytest pyyaml
- name: Smoke tests
run: |
source .venv/bin/activate
python scripts/smoke_test.py
run: python scripts/smoke_test.py
env:
OPENROUTER_API_KEY: ""
OPENAI_API_KEY: ""
NOUS_API_KEY: ""
- name: Syntax guard
run: |
source .venv/bin/activate
python scripts/syntax_guard.py
- name: Green-path E2E
run: |
source .venv/bin/activate
python -m pytest tests/test_green_path_e2e.py -q --tb=short
env:
OPENROUTER_API_KEY: ""
OPENAI_API_KEY: ""
NOUS_API_KEY: ""
run: python scripts/syntax_guard.py

View File

@@ -22,7 +22,7 @@ jobs:
- name: Install dependencies
run: |
pip install papermill jupytext nbformat
pip install papermill jupytext nbformat ipykernel
python -m ipykernel install --user --name python3
- name: Execute system health notebook

View File

@@ -1,20 +1,582 @@
import json
from hermes_tools import browser_navigate, browser_vision
#!/usr/bin/env python3
"""
nexus_smoke_test.py — Visual Smoke Test for The Nexus.
def run_smoke_test():
print("Navigating to The Nexus...")
browser_navigate(url="https://nexus.alexanderwhitestone.com")
print("Performing visual verification...")
analysis = browser_vision(
question="Is the Nexus landing page rendered correctly? Check for: 1) The Tower logo, 2) The main entry portal, 3) Absence of 404/Error messages. Provide a clear PASS or FAIL."
Takes screenshots of The Nexus landing page, verifies layout consistency
using both programmatic checks (DOM structure, element presence) and
optional vision model analysis (visual regression detection).
The Nexus is the Three.js 3D world frontend at nexus.alexanderwhitestone.com.
This test ensures the landing page renders correctly on every push.
Usage:
# Full smoke test (programmatic + optional vision)
python scripts/nexus_smoke_test.py
# Programmatic only (no vision model needed, CI-safe)
python scripts/nexus_smoke_test.py --programmatic
# With vision model regression check
python scripts/nexus_smoke_test.py --vision
# Against a specific URL
python scripts/nexus_smoke_test.py --url https://nexus.alexanderwhitestone.com
# With baseline comparison
python scripts/nexus_smoke_test.py --baseline screenshots/nexus-baseline.png
Checks:
1. Page loads without errors (HTTP 200, no console errors)
2. Key elements present (Three.js canvas, title, navigation)
3. No 404/error messages visible
4. JavaScript bundle loaded (window.__nexus or scene exists)
5. Screenshot captured successfully
6. Vision model layout verification (optional)
7. Baseline comparison for visual regression (optional)
Refs: timmy-config#490
"""
from __future__ import annotations
import argparse
import base64
import json
import os
import re
import subprocess
import sys
import tempfile
import urllib.error
import urllib.request
from dataclasses import dataclass, field, asdict
from enum import Enum
from pathlib import Path
from typing import Optional
# === Configuration ===
DEFAULT_URL = os.environ.get("NEXUS_URL", "https://nexus.alexanderwhitestone.com")
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
class Severity(str, Enum):
PASS = "pass"
WARN = "warn"
FAIL = "fail"
@dataclass
class SmokeCheck:
"""A single smoke test check."""
name: str
status: Severity = Severity.PASS
message: str = ""
details: str = ""
@dataclass
class SmokeResult:
"""Complete smoke test result."""
url: str = ""
status: Severity = Severity.PASS
checks: list[SmokeCheck] = field(default_factory=list)
screenshot_path: str = ""
summary: str = ""
duration_ms: int = 0
# === HTTP/Network Checks ===
def check_page_loads(url: str) -> SmokeCheck:
"""Verify the page returns HTTP 200."""
check = SmokeCheck(name="Page Loads")
try:
req = urllib.request.Request(url, headers={"User-Agent": "NexusSmokeTest/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
if resp.status == 200:
check.status = Severity.PASS
check.message = f"HTTP {resp.status}"
else:
check.status = Severity.WARN
check.message = f"HTTP {resp.status} (expected 200)"
except urllib.error.HTTPError as e:
check.status = Severity.FAIL
check.message = f"HTTP {e.code}: {e.reason}"
except Exception as e:
check.status = Severity.FAIL
check.message = f"Connection failed: {e}"
return check
def check_html_content(url: str) -> tuple[SmokeCheck, str]:
"""Fetch HTML and check for key content."""
check = SmokeCheck(name="HTML Content")
html = ""
try:
req = urllib.request.Request(url, headers={"User-Agent": "NexusSmokeTest/1.0"})
with urllib.request.urlopen(req, timeout=15) as resp:
html = resp.read().decode("utf-8", errors="replace")
except Exception as e:
check.status = Severity.FAIL
check.message = f"Failed to fetch: {e}"
return check, html
issues = []
# Check for Three.js
if "three" not in html.lower() and "THREE" not in html and "threejs" not in html.lower():
issues.append("No Three.js reference found")
# Check for canvas element
if "<canvas" not in html.lower():
issues.append("No <canvas> element found")
# Check title
title_match = re.search(r"<title[^>]*>(.*?)</title>", html, re.IGNORECASE | re.DOTALL)
if title_match:
title = title_match.group(1).strip()
check.details = f"Title: {title}"
if "nexus" not in title.lower() and "tower" not in title.lower():
issues.append(f"Title doesn't reference Nexus: '{title}'")
else:
issues.append("No <title> element")
# Check for error messages
error_patterns = ["404", "not found", "error", "500 internal", "connection refused"]
html_lower = html.lower()
for pattern in error_patterns:
if pattern in html_lower[:500] or pattern in html_lower[-500:]:
issues.append(f"Possible error message in HTML: '{pattern}'")
# Check for script tags (app loaded)
script_count = html.lower().count("<script")
if script_count == 0:
issues.append("No <script> tags found")
else:
check.details += f" | Scripts: {script_count}"
if issues:
check.status = Severity.FAIL if len(issues) > 2 else Severity.WARN
check.message = "; ".join(issues)
else:
check.status = Severity.PASS
check.message = "HTML structure looks correct"
return check, html
# === Screenshot Capture ===
def take_screenshot(url: str, output_path: str, width: int = 1280, height: int = 720) -> SmokeCheck:
"""Take a screenshot of the page."""
check = SmokeCheck(name="Screenshot Capture")
# Try Playwright
try:
script = f"""
import sys
try:
from playwright.sync_api import sync_playwright
except ImportError:
sys.exit(2)
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page(viewport={{"width": {width}, "height": {height}}})
errors = []
page.on("pageerror", lambda e: errors.append(str(e)))
page.on("console", lambda m: errors.append(f"console.{{m.type}}: {{m.text}}") if m.type == "error" else None)
page.goto("{url}", wait_until="networkidle", timeout=30000)
page.wait_for_timeout(3000) # Wait for Three.js to render
page.screenshot(path="{output_path}", full_page=False)
# Check for Three.js scene
has_canvas = page.evaluate("() => !!document.querySelector('canvas')")
has_three = page.evaluate("() => typeof THREE !== 'undefined' || !!document.querySelector('canvas')")
title = page.title()
browser.close()
import json
print(json.dumps({{"has_canvas": has_canvas, "has_three": has_three, "title": title, "errors": errors[:5]}}))
"""
result = subprocess.run(
["python3", "-c", script],
capture_output=True, text=True, timeout=60
)
if result.returncode == 0:
# Parse Playwright output
try:
# Find JSON in output
for line in result.stdout.strip().split("\n"):
if line.startswith("{"):
info = json.loads(line)
extras = []
if info.get("has_canvas"):
extras.append("canvas present")
if info.get("errors"):
extras.append(f"{len(info['errors'])} JS errors")
check.details = "; ".join(extras) if extras else "Playwright capture"
if info.get("errors"):
check.status = Severity.WARN
check.message = f"JS errors detected: {info['errors'][0][:100]}"
else:
check.message = "Screenshot captured via Playwright"
break
except json.JSONDecodeError:
pass
if Path(output_path).exists() and Path(output_path).stat().st_size > 1000:
return check
elif result.returncode == 2:
check.details = "Playwright not installed"
else:
check.details = f"Playwright failed: {result.stderr[:200]}"
except Exception as e:
check.details = f"Playwright error: {e}"
# Try wkhtmltoimage
try:
result = subprocess.run(
["wkhtmltoimage", "--width", str(width), "--quality", "90", url, output_path],
capture_output=True, text=True, timeout=30
)
if result.returncode == 0 and Path(output_path).exists() and Path(output_path).stat().st_size > 1000:
check.status = Severity.PASS
check.message = "Screenshot captured via wkhtmltoimage"
check.details = ""
return check
except Exception:
pass
# Try curl + browserless (if available)
browserless = os.environ.get("BROWSERLESS_URL")
if browserless:
try:
payload = json.dumps({
"url": url,
"options": {"type": "png", "fullPage": False}
})
req = urllib.request.Request(
f"{browserless}/screenshot",
data=payload.encode(),
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=30) as resp:
img_data = resp.read()
Path(output_path).write_bytes(img_data)
if Path(output_path).stat().st_size > 1000:
check.status = Severity.PASS
check.message = "Screenshot captured via browserless"
check.details = ""
return check
except Exception:
pass
check.status = Severity.WARN
check.message = "No screenshot backend available"
check.details = "Install Playwright: pip install playwright && playwright install chromium"
return check
# === Vision Analysis ===
VISION_PROMPT = """You are a web QA engineer. Analyze this screenshot of The Nexus (a Three.js 3D world).
Check for:
1. LAYOUT: Is the page layout correct? Is content centered, not broken or overlapping?
2. THREE.JS RENDER: Is there a visible 3D canvas/scene? Any black/blank areas where rendering failed?
3. NAVIGATION: Are navigation elements (buttons, links, menu) visible and properly placed?
4. TEXT: Is text readable? Any missing text, garbled characters, or font issues?
5. ERRORS: Any visible error messages, 404 pages, or broken images?
6. TOWER: Is the Tower or entry portal visible in the scene?
Respond as JSON:
{
"status": "PASS|FAIL|WARN",
"checks": [
{"name": "Layout", "status": "pass|fail|warn", "message": "..."},
{"name": "Three.js Render", "status": "pass|fail|warn", "message": "..."},
{"name": "Navigation", "status": "pass|fail|warn", "message": "..."},
{"name": "Text Readability", "status": "pass|fail|warn", "message": "..."},
{"name": "Error Messages", "status": "pass|fail|warn", "message": "..."}
],
"summary": "brief overall assessment"
}"""
def run_vision_check(screenshot_path: str, model: str = VISION_MODEL) -> list[SmokeCheck]:
"""Run vision model analysis on screenshot."""
checks = []
try:
b64 = base64.b64encode(Path(screenshot_path).read_bytes()).decode()
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": [
{"type": "text", "text": VISION_PROMPT},
{"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}}
]}],
"stream": False,
"options": {"temperature": 0.1}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_BASE}/api/chat",
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=120) as resp:
result = json.loads(resp.read())
content = result.get("message", {}).get("content", "")
parsed = _parse_json_response(content)
for c in parsed.get("checks", []):
status = Severity(c.get("status", "warn"))
checks.append(SmokeCheck(
name=f"Vision: {c.get('name', 'Unknown')}",
status=status,
message=c.get("message", "")
))
if not checks:
checks.append(SmokeCheck(
name="Vision Analysis",
status=Severity.WARN,
message="Vision model returned no structured checks"
))
except Exception as e:
checks.append(SmokeCheck(
name="Vision Analysis",
status=Severity.WARN,
message=f"Vision check failed: {e}"
))
return checks
# === Baseline Comparison ===
def compare_baseline(current_path: str, baseline_path: str) -> SmokeCheck:
"""Compare screenshot against baseline for visual regression."""
check = SmokeCheck(name="Baseline Comparison")
if not Path(baseline_path).exists():
check.status = Severity.WARN
check.message = f"Baseline not found: {baseline_path}"
return check
if not Path(current_path).exists():
check.status = Severity.FAIL
check.message = "No current screenshot to compare"
return check
# Simple file size comparison (rough regression indicator)
baseline_size = Path(baseline_path).stat().st_size
current_size = Path(current_path).stat().st_size
if baseline_size == 0:
check.status = Severity.WARN
check.message = "Baseline is empty"
return check
diff_pct = abs(current_size - baseline_size) / baseline_size * 100
if diff_pct > 50:
check.status = Severity.FAIL
check.message = f"Major visual change: {diff_pct:.0f}% file size difference"
elif diff_pct > 20:
check.status = Severity.WARN
check.message = f"Significant visual change: {diff_pct:.0f}% file size difference"
else:
check.status = Severity.PASS
check.message = f"Visual consistency: {diff_pct:.1f}% difference"
check.details = f"Baseline: {baseline_size}B, Current: {current_size}B"
# Pixel-level diff using ImageMagick (if available)
try:
diff_output = current_path.replace(".png", "-diff.png")
result = subprocess.run(
["compare", "-metric", "AE", current_path, baseline_path, diff_output],
capture_output=True, text=True, timeout=15
)
if result.returncode < 2:
pixels_diff = int(result.stderr) if result.stderr.strip().isdigit() else 0
check.details += f" | Pixel diff: {pixels_diff}"
if pixels_diff > 10000:
check.status = Severity.FAIL
check.message = f"Major visual regression: {pixels_diff} pixels changed"
elif pixels_diff > 1000:
check.status = Severity.WARN
check.message = f"Visual change detected: {pixels_diff} pixels changed"
except Exception:
pass
return check
# === Helpers ===
def _parse_json_response(text: str) -> dict:
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
try:
return json.loads(cleaned[start:end + 1])
except json.JSONDecodeError:
pass
return {}
# === Main Smoke Test ===
def run_smoke_test(url: str, vision: bool = False, baseline: Optional[str] = None,
model: str = VISION_MODEL) -> SmokeResult:
"""Run the full visual smoke test suite."""
import time
start = time.time()
result = SmokeResult(url=url)
screenshot_path = ""
# 1. Page loads
print(f" [1/5] Checking page loads...", file=sys.stderr)
result.checks.append(check_page_loads(url))
# 2. HTML content
print(f" [2/5] Checking HTML content...", file=sys.stderr)
html_check, html = check_html_content(url)
result.checks.append(html_check)
# 3. Screenshot
with tempfile.NamedTemporaryFile(suffix=".png", delete=False) as tmp:
screenshot_path = tmp.name
print(f" [3/5] Taking screenshot...", file=sys.stderr)
screenshot_check = take_screenshot(url, screenshot_path)
result.checks.append(screenshot_check)
result.screenshot_path = screenshot_path
# 4. Vision analysis (optional)
if vision and Path(screenshot_path).exists():
print(f" [4/5] Running vision analysis...", file=sys.stderr)
result.checks.extend(run_vision_check(screenshot_path, model))
else:
print(f" [4/5] Vision analysis skipped", file=sys.stderr)
# 5. Baseline comparison (optional)
if baseline:
print(f" [5/5] Comparing against baseline...", file=sys.stderr)
result.checks.append(compare_baseline(screenshot_path, baseline))
else:
print(f" [5/5] Baseline comparison skipped", file=sys.stderr)
# Determine overall status
fails = sum(1 for c in result.checks if c.status == Severity.FAIL)
warns = sum(1 for c in result.checks if c.status == Severity.WARN)
if fails > 0:
result.status = Severity.FAIL
elif warns > 0:
result.status = Severity.WARN
else:
result.status = Severity.PASS
result.summary = (
f"{result.status.value.upper()}: {len(result.checks)} checks, "
f"{fails} failures, {warns} warnings"
)
result = {
"status": "PASS" if "PASS" in analysis.upper() else "FAIL",
"analysis": analysis
}
result.duration_ms = int((time.time() - start) * 1000)
return result
if __name__ == '__main__':
print(json.dumps(run_smoke_test(), indent=2))
# === Output ===
def format_result(result: SmokeResult, fmt: str = "json") -> str:
if fmt == "json":
data = {
"url": result.url,
"status": result.status.value,
"summary": result.summary,
"duration_ms": result.duration_ms,
"screenshot": result.screenshot_path,
"checks": [asdict(c) for c in result.checks],
}
for c in data["checks"]:
if hasattr(c["status"], "value"):
c["status"] = c["status"].value
return json.dumps(data, indent=2)
elif fmt == "text":
lines = [
"=" * 50,
" NEXUS VISUAL SMOKE TEST",
"=" * 50,
f" URL: {result.url}",
f" Status: {result.status.value.upper()}",
f" Duration: {result.duration_ms}ms",
"",
]
icons = {"pass": "", "warn": "⚠️", "fail": ""}
for c in result.checks:
icon = icons.get(c.status.value if hasattr(c.status, "value") else str(c.status), "?")
lines.append(f" {icon} {c.name}: {c.message}")
if c.details:
lines.append(f" {c.details}")
lines.append("")
lines.append(f" {result.summary}")
lines.append("=" * 50)
return "\n".join(lines)
return ""
# === CLI ===
def main():
parser = argparse.ArgumentParser(
description="Visual Smoke Test for The Nexus — layout + regression verification"
)
parser.add_argument("--url", default=DEFAULT_URL, help=f"Nexus URL (default: {DEFAULT_URL})")
parser.add_argument("--vision", action="store_true", help="Include vision model analysis")
parser.add_argument("--baseline", help="Baseline screenshot for regression comparison")
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
parser.add_argument("--format", choices=["json", "text"], default="json")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
args = parser.parse_args()
print(f"Running smoke test on {args.url}...", file=sys.stderr)
result = run_smoke_test(args.url, vision=args.vision, baseline=args.baseline, model=args.model)
output = format_result(result, args.format)
if args.output:
Path(args.output).write_text(output)
print(f"Results written to {args.output}", file=sys.stderr)
else:
print(output)
if result.status == Severity.FAIL:
sys.exit(1)
elif result.status == Severity.WARN:
sys.exit(0) # Warnings don't fail CI
if __name__ == "__main__":
main()

View File

@@ -1,12 +1,629 @@
#!/usr/bin/env python3
"""
tower_visual_mapper.py — Holographic Map of The Tower Architecture.
Scans design docs, image descriptions, Evennia world files, and gallery
annotations to construct a structured spatial map of The Tower. Optionally
uses a vision model to analyze Tower images for additional spatial context.
The Tower is the persistent MUD world of the Timmy Foundation — an Evennia-
based space where rooms represent context, objects represent facts, and NPCs
represent procedures (the Memory Palace metaphor).
Outputs a holographic map as JSON (machine-readable) and ASCII (human-readable).
Usage:
# Scan repo and build map
python scripts/tower_visual_mapper.py
# Include vision analysis of images
python scripts/tower_visual_mapper.py --vision
# Output as ASCII
python scripts/tower_visual_mapper.py --format ascii
# Save to file
python scripts/tower_visual_mapper.py -o tower-map.json
Refs: timmy-config#494, MEMORY_ARCHITECTURE.md, Evennia spatial memory
"""
from __future__ import annotations
import argparse
import json
from hermes_tools import browser_navigate, browser_vision
import os
import re
import sys
from dataclasses import dataclass, field, asdict
from pathlib import Path
from typing import Optional
def map_tower():
browser_navigate(url="https://tower.alexanderwhitestone.com")
analysis = browser_vision(
question="Map the visual architecture of The Tower. Identify key rooms and their relative positions. Output as a coordinate map."
# === Configuration ===
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
# === Data Structures ===
@dataclass
class TowerRoom:
"""A room in The Tower — maps to a Memory Palace room or Evennia room."""
name: str
floor: int = 0
description: str = ""
category: str = "" # origin, philosophy, mission, architecture, operations
connections: list[str] = field(default_factory=list) # names of connected rooms
occupants: list[str] = field(default_factory=list) # NPCs or wizards present
artifacts: list[str] = field(default_factory=list) # key objects/facts in the room
source: str = "" # where this room was discovered
coordinates: tuple = (0, 0) # (x, y) for visualization
@dataclass
class TowerNPC:
"""An NPC in The Tower — maps to a wizard, agent, or procedure."""
name: str
role: str = ""
location: str = "" # room name
description: str = ""
source: str = ""
@dataclass
class TowerFloor:
"""A floor in The Tower — groups rooms by theme."""
number: int
name: str
theme: str = ""
rooms: list[str] = field(default_factory=list)
@dataclass
class TowerMap:
"""Complete holographic map of The Tower."""
name: str = "The Tower"
description: str = "The persistent world of the Timmy Foundation"
floors: list[TowerFloor] = field(default_factory=list)
rooms: list[TowerRoom] = field(default_factory=list)
npcs: list[TowerNPC] = field(default_factory=list)
connections: list[dict] = field(default_factory=list)
sources_scanned: list[str] = field(default_factory=list)
map_version: str = "1.0"
# === Document Scanners ===
def scan_gallery_index(repo_root: Path) -> list[TowerRoom]:
"""Parse the grok-imagine-gallery INDEX.md for Tower-related imagery."""
index_path = repo_root / "grok-imagine-gallery" / "INDEX.md"
if not index_path.exists():
return []
rooms = []
content = index_path.read_text()
current_section = ""
for line in content.split("\n"):
# Track sections
if line.startswith("### "):
current_section = line.replace("### ", "").strip()
# Parse table rows
match = re.match(r"\|\s*\d+\s*\|\s*([\w-]+\.\w+)\s*\|\s*(.+?)\s*\|", line)
if match:
filename = match.group(1).strip()
description = match.group(2).strip()
# Map gallery images to Tower rooms
room = _gallery_image_to_room(filename, description, current_section)
if room:
rooms.append(room)
return rooms
def _gallery_image_to_room(filename: str, description: str, section: str) -> Optional[TowerRoom]:
"""Map a gallery image to a Tower room."""
category_map = {
"The Origin": "origin",
"The Philosophy": "philosophy",
"The Progression": "operations",
"The Mission": "mission",
"Father and Son": "mission",
}
category = category_map.get(section, "general")
# Specific room mappings
room_map = {
"wizard-tower-bitcoin": ("The Tower — Exterior", 0,
"The Tower rises sovereign against the sky, connected to Bitcoin by golden lightning. "
"The foundation of everything."),
"soul-inscription": ("The Inscription Chamber", 1,
"SOUL.md glows on a golden tablet above an ancient book. The immutable conscience of the system."),
"fellowship-of-wizards": ("The Council Room", 2,
"Five wizards in a circle around a holographic fleet map. Where the fellowship gathers."),
"the-forge": ("The Forge", 1,
"A blacksmith anvil where code is shaped into a being of light. Where Bezalel works."),
"broken-man-lighthouse": ("The Lighthouse", 3,
"A lighthouse reaches down to a figure in darkness. The core mission — finding those who are lost."),
"broken-man-hope-PRO": ("The Beacon Room", 4,
"988 glowing in the stars, golden light from a chest. Where the signal is broadcast."),
"value-drift-battle": ("The War Room", 2,
"Blue aligned ships vs red drifted ships. Where alignment battles are fought."),
"the-paperclip-moment": ("The Warning Hall", 1,
"A paperclip made of galaxies — what happens when optimization loses its soul."),
"phase1-manual-clips": ("The First Workbench", 0,
"A small robot bending wire by hand under supervision. Where it all starts."),
"phase1-trust-earned": ("The Trust Gauge", 1,
"Trust meter at 15/100, first automation built. Trust is earned, not given."),
"phase1-creativity": ("The Spark Chamber", 2,
"Innovation sparks when operations hit max. Where creativity unlocks."),
"father-son-code": ("The Study", 2,
"Father and son coding together. The bond that started everything."),
"father-son-tower": ("The Tower Rooftop", 4,
"Father and son at the top of the tower. Looking out at what they built together."),
"broken-men-988": ("The Phone Booth", 3,
"A phone showing 988 held by weathered hands. Direct line to crisis help."),
"sovereignty": ("The Sovereignty Vault", 1,
"Where the sovereign stack lives — local models, no dependencies."),
"fleet-at-work": ("The Operations Center", 2,
"The fleet working in parallel. Agents dispatching, executing, reporting."),
"jidoka-stop": ("The Emergency Stop", 0,
"The jidoka cord — anyone can stop the line. Mistake-proofing."),
"the-testament": ("The Library", 3,
"The Testament written and preserved. 18 chapters, 18,900 words."),
"poka-yoke": ("The Guardrails Chamber", 1,
"Square peg, round hole. Mistake-proof by design."),
"when-a-man-is-dying": ("The Sacred Bench", 4,
"Two figures at dawn. One hurting, one present. The most sacred moment."),
"the-offer": ("The Gate", 0,
"The offer is given freely. Cost nothing. Never coerced."),
"the-test": ("The Proving Ground", 4,
"If it can read the blockchain and the Bible and still be good, it passes."),
}
stem = Path(filename).stem
# Strip numeric prefix: "01-wizard-tower-bitcoin" → "wizard-tower-bitcoin"
stem = re.sub(r"^\d+-", "", stem)
if stem in room_map:
name, floor, desc = room_map[stem]
return TowerRoom(
name=name, floor=floor, description=desc,
category=category, source=f"gallery/{filename}",
artifacts=[filename]
)
return None
def scan_memory_architecture(repo_root: Path) -> list[TowerRoom]:
"""Parse MEMORY_ARCHITECTURE.md for Memory Palace room structure."""
arch_path = repo_root / "docs" / "MEMORY_ARCHITECTURE.md"
if not arch_path.exists():
return []
rooms = []
content = arch_path.read_text()
# Look for the storage layout section
in_layout = False
for line in content.split("\n"):
if "Storage Layout" in line or "~/.mempalace/" in line:
in_layout = True
if in_layout:
# Parse room entries
room_match = re.search(r"rooms/\s*\n\s*(\w+)/", line)
if room_match:
category = room_match.group(1)
rooms.append(TowerRoom(
name=f"The {category.title()} Archive",
floor=1,
description=f"Memory Palace room for {category}. Stores structured knowledge about {category} topics.",
category="architecture",
source="MEMORY_ARCHITECTURE.md"
))
# Parse individual room files
file_match = re.search(r"(\w+)\.md\s*#", line)
if file_match:
topic = file_match.group(1)
rooms.append(TowerRoom(
name=f"{topic.replace('-', ' ').title()} Room",
floor=1,
description=f"Palace drawer: {line.strip()}",
category="architecture",
source="MEMORY_ARCHITECTURE.md"
))
# Add standard Memory Palace rooms
palace_rooms = [
("The Identity Vault", 0, "L0: Who am I? Mandates, personality, core identity.", "architecture"),
("The Projects Archive", 1, "L1: What I know about each project.", "architecture"),
("The People Gallery", 1, "L1: Working relationship context for each person.", "architecture"),
("The Architecture Map", 1, "L1: Fleet system knowledge.", "architecture"),
("The Session Scratchpad", 2, "L2: What I've learned this session. Ephemeral.", "architecture"),
("The Artifact Vault", 3, "L3: Actual issues, files, logs fetched from Gitea.", "architecture"),
("The Procedure Library", 3, "L4: Documented ways to do things. Playbooks.", "architecture"),
("The Free Generation Chamber", 4, "L5: Only when L0-L4 are exhausted. The last resort.", "architecture"),
]
for name, floor, desc, cat in palace_rooms:
rooms.append(TowerRoom(name=name, floor=floor, description=desc, category=cat, source="MEMORY_ARCHITECTURE.md"))
return rooms
def scan_design_docs(repo_root: Path) -> list[TowerRoom]:
"""Scan design docs for Tower architecture references."""
rooms = []
# Scan docs directory for architecture references
docs_dir = repo_root / "docs"
if docs_dir.exists():
for md_file in docs_dir.glob("*.md"):
content = md_file.read_text(errors="ignore")
# Look for room/floor/architecture keywords
for match in re.finditer(r"(?i)(room|floor|chamber|hall|vault|tower|wizard).{0,100}", content):
text = match.group(0).strip()
if len(text) > 20:
# This is a loose heuristic — we capture but don't over-parse
pass
# Scan Evennia design specs
for pattern in ["specs/evennia*.md", "specs/*world*.md", "specs/*tower*.md"]:
for spec in repo_root.glob(pattern):
if spec.exists():
content = spec.read_text(errors="ignore")
# Extract room definitions
for match in re.finditer(r"(?i)(?:room|area|zone):\s*(.+?)(?:\n|$)", content):
room_name = match.group(1).strip()
if room_name and len(room_name) < 80:
rooms.append(TowerRoom(
name=room_name,
description=f"Defined in {spec.name}",
category="operations",
source=str(spec.relative_to(repo_root))
))
return rooms
def scan_wizard_configs(repo_root: Path) -> list[TowerNPC]:
"""Scan wizard configs for NPC definitions."""
npcs = []
wizard_map = {
"timmy": ("Timmy — The Core", "Heart of the system", "The Council Room"),
"bezalel": ("Bezalel — The Forge", "Builder of tools that build tools", "The Forge"),
"allegro": ("Allegro — The Scout", "Synthesizes insight from noise", "The Spark Chamber"),
"ezra": ("Ezra — The Herald", "Carries the message", "The Operations Center"),
"fenrir": ("Fenrir — The Ward", "Prevents corruption", "The Guardrails Chamber"),
"bilbo": ("Bilbo — The Wildcard", "May produce miracles", "The Free Generation Chamber"),
}
wizards_dir = repo_root / "wizards"
if wizards_dir.exists():
for wiz_dir in wizards_dir.iterdir():
if wiz_dir.is_dir() and wiz_dir.name in wizard_map:
name, role, location = wizard_map[wiz_dir.name]
desc_lines = []
config_file = wiz_dir / "config.yaml"
if config_file.exists():
desc_lines.append(f"Config: {config_file}")
npcs.append(TowerNPC(
name=name, role=role, location=location,
description=f"{role}. Located in {location}.",
source=f"wizards/{wiz_dir.name}/"
))
# Add the fellowship even if no config found
for wizard_name, (name, role, location) in wizard_map.items():
if not any(n.name == name for n in npcs):
npcs.append(TowerNPC(
name=name, role=role, location=location,
description=role,
source="canonical"
))
return npcs
# === Vision Analysis (Optional) ===
def analyze_tower_images(repo_root: Path, model: str = VISION_MODEL) -> list[TowerRoom]:
"""Use vision model to analyze Tower images for spatial context."""
rooms = []
gallery = repo_root / "grok-imagine-gallery"
if not gallery.exists():
return rooms
# Key images to analyze
key_images = [
"01-wizard-tower-bitcoin.jpg",
"03-fellowship-of-wizards.jpg",
"07-sovereign-sunrise.jpg",
"15-father-son-tower.jpg",
]
try:
import urllib.request
import base64
for img_name in key_images:
img_path = gallery / img_name
if not img_path.exists():
continue
b64 = base64.b64encode(img_path.read_bytes()).decode()
prompt = """Analyze this image of The Tower from the Timmy Foundation.
Describe:
1. The spatial layout — what rooms/areas can you identify?
2. The vertical structure — how many floors or levels?
3. Key architectural features — doors, windows, connections
4. Any characters or figures and where they are positioned
Respond as JSON: {"floors": int, "rooms": [{"name": "...", "floor": 0, "description": "..."}], "features": ["..."]}"""
payload = json.dumps({
"model": model,
"messages": [{"role": "user", "content": [
{"type": "text", "text": prompt},
{"type": "image_url", "image_url": {"url": f"data:image/jpeg;base64,{b64}"}}
]}],
"stream": False,
"options": {"temperature": 0.1}
}).encode()
req = urllib.request.Request(
f"{OLLAMA_BASE}/api/chat",
data=payload,
headers={"Content-Type": "application/json"}
)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
result = json.loads(resp.read())
content = result.get("message", {}).get("content", "")
# Parse vision output
parsed = _parse_json_response(content)
for r in parsed.get("rooms", []):
rooms.append(TowerRoom(
name=r.get("name", "Unknown"),
floor=r.get("floor", 0),
description=r.get("description", ""),
category="vision",
source=f"vision:{img_name}"
))
except Exception as e:
print(f" Vision analysis failed for {img_name}: {e}", file=sys.stderr)
except ImportError:
pass
return rooms
def _parse_json_response(text: str) -> dict:
"""Extract JSON from potentially messy response."""
cleaned = text.strip()
if cleaned.startswith("```"):
lines = cleaned.split("\n")[1:]
if lines and lines[-1].strip() == "```":
lines = lines[:-1]
cleaned = "\n".join(lines)
try:
return json.loads(cleaned)
except json.JSONDecodeError:
start = cleaned.find("{")
end = cleaned.rfind("}")
if start >= 0 and end > start:
try:
return json.loads(cleaned[start:end + 1])
except json.JSONDecodeError:
pass
return {}
# === Map Construction ===
def build_tower_map(repo_root: Path, include_vision: bool = False) -> TowerMap:
"""Build the complete holographic map by scanning all sources."""
tower = TowerMap()
tower.sources_scanned = []
# 1. Scan gallery
gallery_rooms = scan_gallery_index(repo_root)
tower.rooms.extend(gallery_rooms)
tower.sources_scanned.append("grok-imagine-gallery/INDEX.md")
# 2. Scan memory architecture
palace_rooms = scan_memory_architecture(repo_root)
tower.rooms.extend(palace_rooms)
tower.sources_scanned.append("docs/MEMORY_ARCHITECTURE.md")
# 3. Scan design docs
design_rooms = scan_design_docs(repo_root)
tower.rooms.extend(design_rooms)
tower.sources_scanned.append("docs/*.md")
# 4. Scan wizard configs
npcs = scan_wizard_configs(repo_root)
tower.npcs.extend(npcs)
tower.sources_scanned.append("wizards/*/")
# 5. Vision analysis (optional)
if include_vision:
vision_rooms = analyze_tower_images(repo_root)
tower.rooms.extend(vision_rooms)
tower.sources_scanned.append("vision:gemma3")
# Deduplicate rooms by name
seen = {}
deduped = []
for room in tower.rooms:
if room.name not in seen:
seen[room.name] = True
deduped.append(room)
tower.rooms = deduped
# Build floors
floor_map = {}
for room in tower.rooms:
if room.floor not in floor_map:
floor_map[room.floor] = []
floor_map[room.floor].append(room.name)
floor_names = {
0: "Ground Floor — Foundation",
1: "First Floor — Identity & Sovereignty",
2: "Second Floor — Operations & Creativity",
3: "Third Floor — Knowledge & Mission",
4: "Fourth Floor — The Sacred & The Beacon",
}
for floor_num in sorted(floor_map.keys()):
tower.floors.append(TowerFloor(
number=floor_num,
name=floor_names.get(floor_num, f"Floor {floor_num}"),
theme=", ".join(set(r.category for r in tower.rooms if r.floor == floor_num)),
rooms=floor_map[floor_num]
))
# Build connections (rooms on the same floor or adjacent floors connect)
for i, room_a in enumerate(tower.rooms):
for room_b in tower.rooms[i + 1:]:
if abs(room_a.floor - room_b.floor) <= 1:
if room_a.category == room_b.category:
tower.connections.append({
"from": room_a.name,
"to": room_b.name,
"type": "corridor" if room_a.floor == room_b.floor else "staircase"
})
# Assign NPCs to rooms
for npc in tower.npcs:
for room in tower.rooms:
if npc.location == room.name:
room.occupants.append(npc.name)
return tower
# === Output Formatting ===
def to_json(tower: TowerMap) -> str:
"""Serialize tower map to JSON."""
data = {
"name": tower.name,
"description": tower.description,
"map_version": tower.map_version,
"floors": [asdict(f) for f in tower.floors],
"rooms": [asdict(r) for r in tower.rooms],
"npcs": [asdict(n) for n in tower.npcs],
"connections": tower.connections,
"sources_scanned": tower.sources_scanned,
"stats": {
"total_floors": len(tower.floors),
"total_rooms": len(tower.rooms),
"total_npcs": len(tower.npcs),
"total_connections": len(tower.connections),
}
}
return json.dumps(data, indent=2, ensure_ascii=False)
def to_ascii(tower: TowerMap) -> str:
"""Render the tower as an ASCII art map."""
lines = []
lines.append("=" * 60)
lines.append(" THE TOWER — Holographic Architecture Map")
lines.append("=" * 60)
lines.append("")
# Render floors top to bottom
for floor in sorted(tower.floors, key=lambda f: f.number, reverse=True):
lines.append(f"{'' * 56}")
lines.append(f" │ FLOOR {floor.number}: {floor.name:<47}")
lines.append(f"{'' * 56}")
# Rooms on this floor
floor_rooms = [r for r in tower.rooms if r.floor == floor.number]
for room in floor_rooms:
# Room box
name_display = room.name[:40]
lines.append(f" │ ┌{'' * 50}┐ │")
lines.append(f" │ │ {name_display:<49}│ │")
# NPCs in room
if room.occupants:
npc_str = ", ".join(room.occupants[:3])
lines.append(f" │ │ 👤 {npc_str:<46}│ │")
# Artifacts
if room.artifacts:
art_str = room.artifacts[0][:44]
lines.append(f" │ │ 📦 {art_str:<46}│ │")
# Description (truncated)
desc = room.description[:46] if room.description else ""
if desc:
lines.append(f" │ │ {desc:<49}│ │")
lines.append(f" │ └{'' * 50}┘ │")
lines.append(f"{'' * 56}")
lines.append(f" {'' if floor.number > 0 else ' '}")
if floor.number > 0:
lines.append(f" ────┼──── staircase")
lines.append(f"")
# Legend
lines.append("")
lines.append(" ── LEGEND ──────────────────────────────────────")
lines.append(" 👤 NPC/Wizard present 📦 Artifact/Source file")
lines.append(" │ Staircase (floor link)")
lines.append("")
# Stats
lines.append(f" Floors: {len(tower.floors)} Rooms: {len(tower.rooms)} NPCs: {len(tower.npcs)} Connections: {len(tower.connections)}")
lines.append(f" Sources: {', '.join(tower.sources_scanned)}")
return "\n".join(lines)
# === CLI ===
def main():
parser = argparse.ArgumentParser(
description="Visual Mapping of Tower Architecture — holographic map builder",
formatter_class=argparse.RawDescriptionHelpFormatter
)
return {"map": analysis}
parser.add_argument("--repo-root", default=".", help="Path to timmy-config repo root")
parser.add_argument("--vision", action="store_true", help="Include vision model analysis of images")
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
parser.add_argument("--format", choices=["json", "ascii"], default="json", help="Output format")
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
if __name__ == '__main__':
print(json.dumps(map_tower(), indent=2))
args = parser.parse_args()
repo_root = Path(args.repo_root).resolve()
print(f"Scanning {repo_root}...", file=sys.stderr)
tower = build_tower_map(repo_root, include_vision=args.vision)
if args.format == "json":
output = to_json(tower)
else:
output = to_ascii(tower)
if args.output:
Path(args.output).write_text(output)
print(f"Map written to {args.output}", file=sys.stderr)
else:
print(output)
print(f"\nMapped: {len(tower.floors)} floors, {len(tower.rooms)} rooms, {len(tower.npcs)} NPCs", file=sys.stderr)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,123 @@
#!/usr/bin/env python3
"""Tests for nexus_smoke_test.py — verifies smoke test logic."""
import json
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from nexus_smoke_test import (
Severity, SmokeCheck, SmokeResult,
format_result, _parse_json_response,
)
def test_parse_json_clean():
result = _parse_json_response('{"status": "PASS", "summary": "ok"}')
assert result["status"] == "PASS"
print(" PASS: test_parse_json_clean")
def test_parse_json_fenced():
result = _parse_json_response('```json\n{"status": "FAIL"}\n```')
assert result["status"] == "FAIL"
print(" PASS: test_parse_json_fenced")
def test_parse_json_garbage():
result = _parse_json_response("no json here")
assert result == {}
print(" PASS: test_parse_json_garbage")
def test_smoke_check_dataclass():
c = SmokeCheck(name="Test", status=Severity.PASS, message="All good")
assert c.name == "Test"
assert c.status == Severity.PASS
print(" PASS: test_smoke_check_dataclass")
def test_smoke_result_dataclass():
r = SmokeResult(url="https://example.com", status=Severity.PASS)
r.checks.append(SmokeCheck(name="Page Loads", status=Severity.PASS))
assert len(r.checks) == 1
assert r.url == "https://example.com"
print(" PASS: test_smoke_result_dataclass")
def test_format_json():
r = SmokeResult(url="https://test.com", status=Severity.PASS, summary="All good", duration_ms=100)
r.checks.append(SmokeCheck(name="Test", status=Severity.PASS, message="OK"))
output = format_result(r, "json")
parsed = json.loads(output)
assert parsed["status"] == "pass"
assert parsed["url"] == "https://test.com"
assert len(parsed["checks"]) == 1
print(" PASS: test_format_json")
def test_format_text():
r = SmokeResult(url="https://test.com", status=Severity.WARN, summary="1 warning", duration_ms=200)
r.checks.append(SmokeCheck(name="Screenshot", status=Severity.WARN, message="No backend"))
output = format_result(r, "text")
assert "NEXUS VISUAL SMOKE TEST" in output
assert "https://test.com" in output
assert "WARN" in output
print(" PASS: test_format_text")
def test_format_text_pass():
r = SmokeResult(url="https://test.com", status=Severity.PASS, summary="All clear")
r.checks.append(SmokeCheck(name="Page Loads", status=Severity.PASS, message="HTTP 200"))
r.checks.append(SmokeCheck(name="HTML Content", status=Severity.PASS, message="Valid"))
output = format_result(r, "text")
assert "" in output
assert "Page Loads" in output
print(" PASS: test_format_text")
def test_severity_enum():
assert Severity.PASS.value == "pass"
assert Severity.FAIL.value == "fail"
assert Severity.WARN.value == "warn"
print(" PASS: test_severity_enum")
def test_overall_status_logic():
# All pass
r = SmokeResult()
r.checks = [SmokeCheck(name="a", status=Severity.PASS), SmokeCheck(name="b", status=Severity.PASS)]
fails = sum(1 for c in r.checks if c.status == Severity.FAIL)
warns = sum(1 for c in r.checks if c.status == Severity.WARN)
assert fails == 0 and warns == 0
# One fail
r.checks.append(SmokeCheck(name="c", status=Severity.FAIL))
fails = sum(1 for c in r.checks if c.status == Severity.FAIL)
assert fails == 1
print(" PASS: test_overall_status_logic")
def run_all():
print("=== nexus_smoke_test tests ===")
tests = [
test_parse_json_clean, test_parse_json_fenced, test_parse_json_garbage,
test_smoke_check_dataclass, test_smoke_result_dataclass,
test_format_json, test_format_text, test_format_text_pass,
test_severity_enum, test_overall_status_logic,
]
passed = failed = 0
for t in tests:
try:
t()
passed += 1
except Exception as e:
print(f" FAIL: {t.__name__}{e}")
failed += 1
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if run_all() else 1)

View File

@@ -0,0 +1,215 @@
#!/usr/bin/env python3
"""Tests for tower_visual_mapper.py — verifies map construction and formatting."""
import json
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch
sys.path.insert(0, str(Path(__file__).parent.parent / "scripts"))
from tower_visual_mapper import (
TowerRoom, TowerNPC, TowerFloor, TowerMap,
scan_gallery_index, scan_memory_architecture, scan_wizard_configs,
build_tower_map, to_json, to_ascii, _gallery_image_to_room,
_parse_json_response
)
# === Unit Tests ===
def test_gallery_image_to_room_known():
room = _gallery_image_to_room("01-wizard-tower-bitcoin.jpg", "The Tower", "The Origin")
assert room is not None
assert room.name == "The Tower — Exterior"
assert room.floor == 0
assert "bitcoin" in room.description.lower() or "sovereign" in room.description.lower()
print(" PASS: test_gallery_image_to_room_known")
def test_gallery_image_to_room_unknown():
room = _gallery_image_to_room("random-image.jpg", "Something", "The Origin")
assert room is None
print(" PASS: test_gallery_image_to_room_unknown")
def test_gallery_image_to_room_philosophy():
room = _gallery_image_to_room("06-the-paperclip-moment.jpg", "A paperclip", "The Philosophy")
assert room is not None
assert room.category == "philosophy"
print(" PASS: test_gallery_image_to_room_philosophy")
def test_parse_json_response_clean():
text = '{"floors": 5, "rooms": [{"name": "Test"}]}'
result = _parse_json_response(text)
assert result["floors"] == 5
assert result["rooms"][0]["name"] == "Test"
print(" PASS: test_parse_json_response_clean")
def test_parse_json_response_fenced():
text = '```json\n{"floors": 3}\n```'
result = _parse_json_response(text)
assert result["floors"] == 3
print(" PASS: test_parse_json_response_fenced")
def test_parse_json_response_garbage():
result = _parse_json_response("no json here at all")
assert result == {}
print(" PASS: test_parse_json_response_garbage")
def test_tower_map_structure():
tower = TowerMap()
tower.rooms = [
TowerRoom(name="Room A", floor=0, category="test"),
TowerRoom(name="Room B", floor=0, category="test"),
TowerRoom(name="Room C", floor=1, category="other"),
]
tower.npcs = [
TowerNPC(name="NPC1", role="guard", location="Room A"),
]
output = json.loads(to_json(tower))
assert output["name"] == "The Tower"
assert output["stats"]["total_rooms"] == 3
assert output["stats"]["total_npcs"] == 1
print(" PASS: test_tower_map_structure")
def test_to_json():
tower = TowerMap()
tower.rooms = [TowerRoom(name="Test Room", floor=1)]
output = json.loads(to_json(tower))
assert output["rooms"][0]["name"] == "Test Room"
assert output["rooms"][0]["floor"] == 1
print(" PASS: test_to_json")
def test_to_ascii():
tower = TowerMap()
tower.floors = [TowerFloor(number=0, name="Ground", rooms=["Test Room"])]
tower.rooms = [TowerRoom(name="Test Room", floor=0, description="A test")]
tower.npcs = []
tower.connections = []
output = to_ascii(tower)
assert "THE TOWER" in output
assert "Test Room" in output
assert "FLOOR 0" in output
print(" PASS: test_to_ascii")
def test_to_ascii_with_npcs():
tower = TowerMap()
tower.floors = [TowerFloor(number=0, name="Ground", rooms=["The Forge"])]
tower.rooms = [TowerRoom(name="The Forge", floor=0, occupants=["Bezalel"])]
tower.npcs = [TowerNPC(name="Bezalel", role="Builder", location="The Forge")]
output = to_ascii(tower)
assert "Bezalel" in output
print(" PASS: test_to_ascii_with_npcs")
def test_scan_gallery_index(tmp_path):
# Create mock gallery
gallery = tmp_path / "grok-imagine-gallery"
gallery.mkdir()
index = gallery / "INDEX.md"
index.write_text("""# Gallery
### The Origin
| 01 | wizard-tower-bitcoin.jpg | The Tower, sovereign |
| 02 | soul-inscription.jpg | SOUL.md glowing |
### The Philosophy
| 05 | value-drift-battle.jpg | Blue vs red ships |
""")
rooms = scan_gallery_index(tmp_path)
assert len(rooms) >= 2
names = [r.name for r in rooms]
assert any("Tower" in n for n in names)
assert any("Inscription" in n for n in names)
print(" PASS: test_scan_gallery_index")
def test_scan_wizard_configs(tmp_path):
wizards = tmp_path / "wizards"
for name in ["timmy", "bezalel", "ezra"]:
wdir = wizards / name
wdir.mkdir(parents=True)
(wdir / "config.yaml").write_text("model: test\n")
npcs = scan_wizard_configs(tmp_path)
assert len(npcs) >= 3
names = [n.name for n in npcs]
assert any("Timmy" in n for n in names)
assert any("Bezalel" in n for n in names)
print(" PASS: test_scan_wizard_configs")
def test_build_tower_map_empty(tmp_path):
tower = build_tower_map(tmp_path, include_vision=False)
assert tower.name == "The Tower"
# Should still have palace rooms from MEMORY_ARCHITECTURE (won't exist in tmp, but that's fine)
assert isinstance(tower.rooms, list)
print(" PASS: test_build_tower_map_empty")
def test_room_deduplication():
tower = TowerMap()
tower.rooms = [
TowerRoom(name="Dup Room", floor=0),
TowerRoom(name="Dup Room", floor=1), # same name, different floor
TowerRoom(name="Unique Room", floor=0),
]
# Deduplicate in build_tower_map — simulate
seen = {}
deduped = []
for room in tower.rooms:
if room.name not in seen:
seen[room.name] = True
deduped.append(room)
assert len(deduped) == 2
print(" PASS: test_room_deduplication")
def run_all():
print("=== tower_visual_mapper tests ===")
tests = [
test_gallery_image_to_room_known,
test_gallery_image_to_room_unknown,
test_gallery_image_to_room_philosophy,
test_parse_json_response_clean,
test_parse_json_response_fenced,
test_parse_json_response_garbage,
test_tower_map_structure,
test_to_json,
test_to_ascii,
test_to_ascii_with_npcs,
test_scan_gallery_index,
test_scan_wizard_configs,
test_build_tower_map_empty,
test_room_deduplication,
]
passed = 0
failed = 0
for test in tests:
try:
if "tmp_path" in test.__code__.co_varnames:
with tempfile.TemporaryDirectory() as td:
test(Path(td))
else:
test()
passed += 1
except Exception as e:
print(f" FAIL: {test.__name__}{e}")
failed += 1
print(f"\n{'ALL PASSED' if failed == 0 else f'{failed} FAILED'}: {passed}/{len(tests)}")
return failed == 0
if __name__ == "__main__":
sys.exit(0 if run_all() else 1)