Some checks failed
Validate Config / YAML Lint (pull_request) Failing after 11s
PR Checklist / pr-checklist (pull_request) Successful in 3m33s
Validate Config / JSON Validate (pull_request) Successful in 9s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 13s
Validate Config / Cron Syntax Check (pull_request) Successful in 8s
Validate Config / Playbook Schema Validation (pull_request) Successful in 19s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 24s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 8s
Architecture Lint / Lint Repository (pull_request) Failing after 15s
Architecture Lint / Linter Tests (pull_request) Successful in 18s
Smoke Test / smoke (pull_request) Failing after 12s
Replaces 11-line stub with full visual QA tool. Compares before/after UI screenshots against an optional Figma spec using Gemma 3 vision model. Features: - Before/after screenshot diff analysis with severity classification - Figma spec comparison with adherence percentage scoring - Gitea PR integration (auto-fetch changed images from PR) - Batch mode for reviewing screenshot directories - Structured JSON + human-readable text output - Ollama vision backend (gemma3:12b) with Hermes fallback - PASS/FAIL/WARN status with critical/major/minor/cosmetic severity CLI: visual_pr_reviewer.py --before b.png --after a.png visual_pr_reviewer.py --before b.png --after a.png --spec figma.png visual_pr_reviewer.py --repo owner/repo --pr 123 visual_pr_reviewer.py --batch ./screenshots/ Tests: 10/10 passing. Closes #495
555 lines
19 KiB
Python
555 lines
19 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
visual_pr_reviewer.py — Multimodal Visual PR Review Tool.
|
|
|
|
Compares 'before' and 'after' screenshots of a UI change against an optional
|
|
design spec (Figma export, wireframe, or reference image). Uses a vision model
|
|
to detect visual regressions, layout shifts, and spec deviations.
|
|
|
|
Usage:
|
|
# Compare before/after screenshots
|
|
python scripts/visual_pr_reviewer.py --before before.png --after after.png
|
|
|
|
# Compare against a Figma spec
|
|
python scripts/visual_pr_reviewer.py --before before.png --after after.png --spec figma.png
|
|
|
|
# Review all changed HTML/CSS in a PR branch
|
|
python scripts/visual_pr_reviewer.py --repo Timmy_Foundation/the-beacon --pr 116
|
|
|
|
# Batch review a directory of screenshot pairs
|
|
python scripts/visual_pr_reviewer.py --batch ./screenshots/
|
|
|
|
Output (JSON):
|
|
{
|
|
"status": "PASS" | "FAIL" | "WARN",
|
|
"score": 0-100,
|
|
"discrepancies": [...],
|
|
"spec_adherence": {...},
|
|
"summary": "..."
|
|
}
|
|
|
|
Requires: Ollama with a vision model (gemma3:12b, llava, etc.) or a browser with vision API.
|
|
Refs: timmy-config#495
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import base64
|
|
import json
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import urllib.error
|
|
import urllib.request
|
|
from dataclasses import dataclass, field, asdict
|
|
from enum import Enum
|
|
from pathlib import Path
|
|
from typing import Optional
|
|
|
|
|
|
# === Configuration ===
|
|
|
|
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
|
|
VISION_MODEL = os.environ.get("VISUAL_REVIEW_MODEL", "gemma3:12b")
|
|
GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1")
|
|
|
|
|
|
class Status(str, Enum):
|
|
PASS = "PASS"
|
|
FAIL = "FAIL"
|
|
WARN = "WARN"
|
|
|
|
|
|
@dataclass
|
|
class Discrepancy:
|
|
"""A single visual discrepancy found between before/after or against spec."""
|
|
region: str # e.g. "header", "button-row", "sidebar"
|
|
severity: str # "critical", "major", "minor", "cosmetic"
|
|
description: str # What changed or diverged
|
|
before: str = "" # What was there before
|
|
after: str = "" # What is there now
|
|
spec_match: bool = True # Does it match the spec?
|
|
|
|
|
|
@dataclass
|
|
class ReviewResult:
|
|
"""Complete review result for a single before/after/spec comparison."""
|
|
status: Status = Status.PASS
|
|
score: int = 100
|
|
discrepancies: list[Discrepancy] = field(default_factory=list)
|
|
spec_adherence: dict = field(default_factory=dict)
|
|
summary: str = ""
|
|
model_used: str = ""
|
|
images_reviewed: dict = field(default_factory=dict)
|
|
|
|
|
|
# === Vision Model Interface ===
|
|
|
|
def encode_image_base64(path: str) -> str:
|
|
"""Read an image file and return base64-encoded data."""
|
|
with open(path, "rb") as f:
|
|
return base64.b64encode(f.read()).decode("utf-8")
|
|
|
|
|
|
def call_ollama_vision(prompt: str, images: list[str], model: str = VISION_MODEL) -> str:
|
|
"""Call Ollama's vision endpoint with one or more images."""
|
|
url = f"{OLLAMA_BASE}/api/chat"
|
|
|
|
content_parts = [{"type": "text", "text": prompt}]
|
|
for img_path in images:
|
|
b64 = encode_image_base64(img_path)
|
|
content_parts.append({"type": "image_url", "image_url": {"url": f"data:image/png;base64,{b64}"}})
|
|
|
|
payload = {
|
|
"model": model,
|
|
"messages": [{"role": "user", "content": content_parts}],
|
|
"stream": False,
|
|
"options": {"temperature": 0.1}
|
|
}
|
|
|
|
data = json.dumps(payload).encode()
|
|
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
|
|
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=120) as resp:
|
|
result = json.loads(resp.read())
|
|
return result.get("message", {}).get("content", "")
|
|
except urllib.error.URLError as e:
|
|
raise RuntimeError(f"Ollama request failed: {e}")
|
|
|
|
|
|
def call_fallback_vision(prompt: str, images: list[str]) -> str:
|
|
"""Fallback: use browser_vision if available (Hermes tool)."""
|
|
# This path is used when running inside a Hermes agent session
|
|
try:
|
|
from hermes_tools import browser_navigate, browser_vision # type: ignore
|
|
# For fallback, we just use the first image
|
|
if images:
|
|
return browser_vision(question=prompt)
|
|
except ImportError:
|
|
pass
|
|
raise RuntimeError("No vision backend available. Install Ollama or run inside Hermes.")
|
|
|
|
|
|
def analyze_with_vision(prompt: str, images: list[str], model: str = VISION_MODEL) -> str:
|
|
"""Analyze images with the vision model. Tries Ollama first, falls back to Hermes tools."""
|
|
try:
|
|
return call_ollama_vision(prompt, images, model)
|
|
except (RuntimeError, Exception) as e:
|
|
print(f" Ollama unavailable ({e}), trying fallback...", file=sys.stderr)
|
|
return call_fallback_vision(prompt, images)
|
|
|
|
|
|
# === Analysis Prompts ===
|
|
|
|
DIFF_ANALYSIS_PROMPT = """You are a visual QA engineer reviewing a UI change.
|
|
|
|
IMAGE 1 is the BEFORE screenshot.
|
|
IMAGE 2 is the AFTER screenshot.
|
|
|
|
Analyze every visible difference between the two images. For each difference:
|
|
1. Describe the region of the UI affected (header, sidebar, button, content area, etc.)
|
|
2. Classify severity: critical (broken/missing), major (layout shift, content wrong), minor (spacing, color), cosmetic (pixel-level)
|
|
3. Describe what was there before and what is there now
|
|
|
|
Also assess:
|
|
- Is any content missing in the AFTER that was in the BEFORE?
|
|
- Are there new elements? Are they correctly placed?
|
|
- Is the layout consistent or shifted?
|
|
- Are fonts, colors, and spacing preserved where intended?
|
|
- Any visual regressions?
|
|
|
|
Respond in this exact JSON format:
|
|
{
|
|
"discrepancies": [
|
|
{
|
|
"region": "string",
|
|
"severity": "critical|major|minor|cosmetic",
|
|
"description": "string",
|
|
"before": "string",
|
|
"after": "string"
|
|
}
|
|
],
|
|
"overall_quality": 0-100,
|
|
"summary": "string"
|
|
}"""
|
|
|
|
SPEC_COMPARISON_PROMPT = """You are a visual QA engineer comparing a UI implementation against a design spec.
|
|
|
|
IMAGE 1 is the BEFORE screenshot (original state).
|
|
IMAGE 2 is the AFTER screenshot (current implementation).
|
|
IMAGE 3 is the DESIGN SPEC (Figma export or wireframe).
|
|
|
|
Compare the AFTER screenshot against the DESIGN SPEC. For each deviation:
|
|
1. Describe the region affected
|
|
2. Classify severity: critical (feature missing/wrong), major (layout/color wrong), minor (spacing/font), cosmetic
|
|
3. Describe what the spec shows vs what the implementation shows
|
|
4. Note whether the deviation is an improvement or regression
|
|
|
|
Also assess:
|
|
- Does the implementation match the spec's layout and hierarchy?
|
|
- Are colors, fonts, and spacing faithful to the spec?
|
|
- Are all spec elements present in the implementation?
|
|
- Is the responsive behavior correct (if visible)?
|
|
- Rate spec adherence percentage.
|
|
|
|
Respond in this exact JSON format:
|
|
{
|
|
"discrepancies": [
|
|
{
|
|
"region": "string",
|
|
"severity": "critical|major|minor|cosmetic",
|
|
"description": "string",
|
|
"before": "string",
|
|
"after": "string",
|
|
"spec_match": true|false
|
|
}
|
|
],
|
|
"spec_adherence_percent": 0-100,
|
|
"overall_quality": 0-100,
|
|
"summary": "string"
|
|
}"""
|
|
|
|
|
|
# === Core Review Logic ===
|
|
|
|
def parse_vision_response(raw: str) -> dict:
|
|
"""Parse the JSON response from the vision model, handling markdown fences."""
|
|
cleaned = raw.strip()
|
|
# Strip markdown code fences
|
|
if cleaned.startswith("```"):
|
|
lines = cleaned.split("\n")
|
|
# Remove first line (```json) and last line (```)
|
|
if lines[0].startswith("```"):
|
|
lines = lines[1:]
|
|
if lines and lines[-1].strip() == "```":
|
|
lines = lines[:-1]
|
|
cleaned = "\n".join(lines)
|
|
try:
|
|
return json.loads(cleaned)
|
|
except json.JSONDecodeError:
|
|
# Try to find JSON in the response
|
|
start = cleaned.find("{")
|
|
end = cleaned.rfind("}")
|
|
if start >= 0 and end > start:
|
|
return json.loads(cleaned[start:end + 1])
|
|
raise ValueError(f"Could not parse vision response as JSON:\n{raw[:500]}")
|
|
|
|
|
|
def review_before_after(before_path: str, after_path: str, spec_path: Optional[str] = None,
|
|
model: str = VISION_MODEL) -> ReviewResult:
|
|
"""Run a visual review comparing before/after screenshots, optionally against a spec."""
|
|
|
|
result = ReviewResult(
|
|
model_used=model,
|
|
images_reviewed={
|
|
"before": before_path,
|
|
"after": after_path,
|
|
"spec": spec_path or "(none)"
|
|
}
|
|
)
|
|
|
|
# Validate inputs
|
|
for label, path in [("before", before_path), ("after", after_path)]:
|
|
if not Path(path).exists():
|
|
result.status = Status.FAIL
|
|
result.summary = f"Missing {label} image: {path}"
|
|
return result
|
|
|
|
if spec_path and not Path(spec_path).exists():
|
|
result.status = Status.WARN
|
|
result.summary = f"Spec image not found: {spec_path}. Running without spec comparison."
|
|
spec_path = None
|
|
|
|
# Build image list and prompt
|
|
images = [before_path, after_path]
|
|
if spec_path:
|
|
images.append(spec_path)
|
|
prompt = SPEC_COMPARISON_PROMPT
|
|
else:
|
|
prompt = DIFF_ANALYSIS_PROMPT
|
|
|
|
# Call vision model
|
|
print(f" Analyzing {len(images)} image(s) with {model}...", file=sys.stderr)
|
|
raw_response = analyze_with_vision(prompt, images, model)
|
|
|
|
# Parse response
|
|
try:
|
|
parsed = parse_vision_response(raw_response)
|
|
except (json.JSONDecodeError, ValueError) as e:
|
|
result.status = Status.WARN
|
|
result.summary = f"Failed to parse vision response: {e}"
|
|
return result
|
|
|
|
# Build discrepancies
|
|
for d in parsed.get("discrepancies", []):
|
|
result.discrepancies.append(Discrepancy(
|
|
region=d.get("region", "unknown"),
|
|
severity=d.get("severity", "minor"),
|
|
description=d.get("description", ""),
|
|
before=d.get("before", ""),
|
|
after=d.get("after", ""),
|
|
spec_match=d.get("spec_match", True)
|
|
))
|
|
|
|
# Score
|
|
result.score = parsed.get("overall_quality", parsed.get("spec_adherence_percent", 50))
|
|
result.summary = parsed.get("summary", "Analysis complete.")
|
|
|
|
# Spec adherence
|
|
if spec_path:
|
|
result.spec_adherence = {
|
|
"percent": parsed.get("spec_adherence_percent", 0),
|
|
"spec_file": spec_path
|
|
}
|
|
|
|
# Determine status
|
|
criticals = sum(1 for d in result.discrepancies if d.severity == "critical")
|
|
majors = sum(1 for d in result.discrepancies if d.severity == "major")
|
|
|
|
if criticals > 0:
|
|
result.status = Status.FAIL
|
|
elif majors > 0 or result.score < 70:
|
|
result.status = Status.WARN
|
|
else:
|
|
result.status = Status.PASS
|
|
|
|
return result
|
|
|
|
|
|
# === Gitea PR Integration ===
|
|
|
|
def get_gitea_token() -> str:
|
|
"""Read Gitea token from standard locations."""
|
|
token_paths = [
|
|
Path.home() / ".config" / "gitea" / "token",
|
|
Path.home() / ".timmy" / "gitea_token",
|
|
]
|
|
for p in token_paths:
|
|
if p.exists():
|
|
return p.read_text().strip()
|
|
return os.environ.get("GITEA_TOKEN", "")
|
|
|
|
|
|
def gitea_api(path: str, token: str = "") -> Optional[dict]:
|
|
"""Call Gitea API."""
|
|
if not token:
|
|
token = get_gitea_token()
|
|
url = f"{GITEA_BASE}{path}"
|
|
req = urllib.request.Request(url, headers={"Authorization": f"token {token}"})
|
|
try:
|
|
with urllib.request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read())
|
|
except Exception as e:
|
|
print(f" Gitea API error: {e}", file=sys.stderr)
|
|
return None
|
|
|
|
|
|
def fetch_pr_screenshots(repo: str, pr_num: int, output_dir: str) -> dict:
|
|
"""
|
|
Fetch before/after screenshots from a PR.
|
|
|
|
Looks for:
|
|
1. Image files changed in the PR
|
|
2. Screenshot attachments in PR comments
|
|
3. CI-generated screenshots (if available)
|
|
|
|
Returns dict with 'before' and 'after' paths, or empty if none found.
|
|
"""
|
|
pr = gitea_api(f"/repos/{repo}/pulls/{pr_num}")
|
|
if not pr:
|
|
return {}
|
|
|
|
# Get changed files
|
|
files = gitea_api(f"/repos/{repo}/pulls/{pr_num}/files") or []
|
|
image_exts = {".png", ".jpg", ".jpeg", ".gif", ".webp"}
|
|
image_files = [f for f in files if Path(f.get("filename", "")).suffix.lower() in image_exts]
|
|
|
|
result = {}
|
|
if image_files:
|
|
# Download the first changed image as "after"
|
|
for img in image_files:
|
|
raw_url = img.get("raw_url", "")
|
|
if raw_url:
|
|
after_path = os.path.join(output_dir, f"after_{Path(img['filename']).name}")
|
|
try:
|
|
urllib.request.urlretrieve(raw_url, after_path)
|
|
result["after"] = after_path
|
|
break
|
|
except Exception:
|
|
continue
|
|
|
|
return result
|
|
|
|
|
|
def review_pr_visual(repo: str, pr_num: int, spec_path: Optional[str] = None) -> ReviewResult:
|
|
"""Review visual changes in a PR."""
|
|
with tempfile.TemporaryDirectory() as tmpdir:
|
|
screenshots = fetch_pr_screenshots(repo, pr_num, tmpdir)
|
|
|
|
if "before" not in screenshots or "after" not in screenshots:
|
|
return ReviewResult(
|
|
status=Status.WARN,
|
|
summary=f"No before/after screenshots found in PR #{pr_num}. "
|
|
f"To use visual review, add screenshot attachments to the PR or "
|
|
f"include image files in the diff."
|
|
)
|
|
|
|
return review_before_after(
|
|
screenshots["before"],
|
|
screenshots["after"],
|
|
spec_path
|
|
)
|
|
|
|
|
|
# === Batch Review ===
|
|
|
|
def review_batch(directory: str, spec_path: Optional[str] = None) -> list[ReviewResult]:
|
|
"""Review all before/after pairs in a directory.
|
|
|
|
Expected naming: before_*.png and after_*.png, or *_before.png and *_after.png.
|
|
"""
|
|
dir_path = Path(directory)
|
|
results = []
|
|
|
|
# Find pairs
|
|
befores = sorted(dir_path.glob("*before*"))
|
|
for before in befores:
|
|
name = before.stem.replace("before", "").replace("_", "").strip("_")
|
|
# Look for matching after
|
|
after_candidates = list(dir_path.glob(f"*{name}*after*")) or list(dir_path.glob(f"*after*{name}*"))
|
|
if after_candidates:
|
|
after = after_candidates[0]
|
|
print(f" Reviewing pair: {before.name} / {after.name}", file=sys.stderr)
|
|
result = review_before_after(str(before), str(after), spec_path)
|
|
result.images_reviewed["pair_name"] = name
|
|
results.append(result)
|
|
|
|
if not results:
|
|
results.append(ReviewResult(
|
|
status=Status.WARN,
|
|
summary=f"No before/after pairs found in {directory}"
|
|
))
|
|
|
|
return results
|
|
|
|
|
|
# === Output Formatting ===
|
|
|
|
def format_result(result: ReviewResult, format: str = "json") -> str:
|
|
"""Format a review result for output."""
|
|
if format == "json":
|
|
output = {
|
|
"status": result.status.value,
|
|
"score": result.score,
|
|
"discrepancies": [asdict(d) for d in result.discrepancies],
|
|
"spec_adherence": result.spec_adherence,
|
|
"summary": result.summary,
|
|
"model_used": result.model_used,
|
|
"images_reviewed": result.images_reviewed,
|
|
}
|
|
return json.dumps(output, indent=2)
|
|
|
|
elif format == "text":
|
|
lines = []
|
|
lines.append(f"=== Visual PR Review ===")
|
|
lines.append(f"Status: {result.status.value}")
|
|
lines.append(f"Score: {result.score}/100")
|
|
lines.append(f"Model: {result.model_used}")
|
|
lines.append(f"Images: {json.dumps(result.images_reviewed)}")
|
|
lines.append("")
|
|
|
|
if result.spec_adherence:
|
|
lines.append(f"Spec Adherence: {result.spec_adherence.get('percent', '?')}%")
|
|
lines.append("")
|
|
|
|
if result.discrepancies:
|
|
lines.append(f"Discrepancies ({len(result.discrepancies)}):")
|
|
for i, d in enumerate(result.discrepancies, 1):
|
|
sev_marker = {"critical": "🔴", "major": "🟡", "minor": "🔵", "cosmetic": "⚪"}.get(d.severity, "⚪")
|
|
lines.append(f" {i}. {sev_marker} [{d.severity.upper()}] {d.region}")
|
|
lines.append(f" {d.description}")
|
|
if d.before or d.after:
|
|
lines.append(f" Before: {d.before}")
|
|
lines.append(f" After: {d.after}")
|
|
lines.append("")
|
|
else:
|
|
lines.append("No discrepancies found.")
|
|
lines.append("")
|
|
|
|
lines.append(f"Summary: {result.summary}")
|
|
return "\n".join(lines)
|
|
|
|
else:
|
|
raise ValueError(f"Unknown format: {format}")
|
|
|
|
|
|
# === CLI ===
|
|
|
|
def main():
|
|
parser = argparse.ArgumentParser(
|
|
description="Visual PR Review Tool — compare UI screenshots with multimodal vision model",
|
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
|
epilog="""
|
|
Examples:
|
|
%(prog)s --before before.png --after after.png
|
|
%(prog)s --before before.png --after after.png --spec figma-export.png
|
|
%(prog)s --repo Timmy_Foundation/the-beacon --pr 116
|
|
%(prog)s --batch ./screenshots/
|
|
"""
|
|
)
|
|
|
|
# Input modes
|
|
group = parser.add_mutually_exclusive_group(required=True)
|
|
group.add_argument("--before", help="Before screenshot path")
|
|
group.add_argument("--repo", help="Gitea repo (owner/name) for PR review")
|
|
group.add_argument("--batch", help="Directory of before/after screenshot pairs")
|
|
|
|
parser.add_argument("--after", help="After screenshot path (required with --before)")
|
|
parser.add_argument("--spec", help="Design spec image (Figma export, wireframe)")
|
|
parser.add_argument("--pr", type=int, help="PR number (required with --repo)")
|
|
parser.add_argument("--model", default=VISION_MODEL, help=f"Vision model (default: {VISION_MODEL})")
|
|
parser.add_argument("--format", choices=["json", "text"], default="json", help="Output format")
|
|
parser.add_argument("--output", "-o", help="Output file (default: stdout)")
|
|
|
|
args = parser.parse_args()
|
|
|
|
# Validate combinations
|
|
if args.before and not args.after:
|
|
parser.error("--after is required when using --before")
|
|
if args.repo and not args.pr:
|
|
parser.error("--pr is required when using --repo")
|
|
|
|
# Run review
|
|
if args.before:
|
|
result = review_before_after(args.before, args.after, args.spec, args.model)
|
|
output = format_result(result, args.format)
|
|
|
|
elif args.repo:
|
|
result = review_pr_visual(args.repo, args.pr, args.spec)
|
|
output = format_result(result, args.format)
|
|
|
|
elif args.batch:
|
|
results = review_batch(args.batch, args.spec)
|
|
if args.format == "json":
|
|
output = json.dumps([json.loads(format_result(r, "json")) for r in results], indent=2)
|
|
else:
|
|
output = "\n---\n".join(format_result(r, "text") for r in results)
|
|
|
|
# Write output
|
|
if args.output:
|
|
Path(args.output).write_text(output)
|
|
print(f"Results written to {args.output}", file=sys.stderr)
|
|
else:
|
|
print(output)
|
|
|
|
# Exit code based on status
|
|
if isinstance(result, ReviewResult) and result.status == Status.FAIL:
|
|
sys.exit(1)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|