822 lines
34 KiB
Python
822 lines
34 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Vision Benchmark Suite — Issue #817
|
|
|
|
Compares Gemma 4 vision accuracy vs current approach (Gemini 3 Flash Preview).
|
|
Measures OCR accuracy, description quality, latency, and token usage.
|
|
|
|
Usage:
|
|
# Run full benchmark
|
|
python benchmarks/vision_benchmark.py --images benchmarks/test_images.json
|
|
|
|
# Single image test
|
|
python benchmarks/vision_benchmark.py --url https://example.com/image.png
|
|
|
|
# Generate test report
|
|
python benchmarks/vision_benchmark.py --images benchmarks/test_images.json --output benchmarks/vision_results.json
|
|
|
|
Test image dataset: benchmarks/test_images.json (50-100 diverse images)
|
|
"""
|
|
|
|
import argparse
|
|
import asyncio
|
|
import base64
|
|
import json
|
|
import mimetypes
|
|
import os
|
|
import statistics
|
|
import sys
|
|
import time
|
|
import urllib.request
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Benchmark configuration
|
|
# ---------------------------------------------------------------------------
|
|
|
|
# Models to compare
|
|
MODELS = {
|
|
"gemma4": {
|
|
"model_id": "google/gemma-4-27b-it",
|
|
"display_name": "Gemma 4 27B",
|
|
"provider": "nous",
|
|
"fallback_provider": "ollama",
|
|
"fallback_model_id": "gemma4:latest",
|
|
"description": "Google's multimodal Gemma 4 model",
|
|
},
|
|
"gemini3_flash": {
|
|
"model_id": "google/gemini-3-flash-preview",
|
|
"display_name": "Gemini 3 Flash Preview",
|
|
"provider": "openrouter",
|
|
"fallback_provider": "gemini",
|
|
"fallback_model_id": "gemini-2.5-flash",
|
|
"description": "Current default vision model",
|
|
},
|
|
}
|
|
|
|
# Evaluation prompts for different test categories
|
|
EVAL_PROMPTS = {
|
|
"screenshot": "Describe this screenshot in detail. What application is shown? What is the current state of the UI?",
|
|
"diagram": "Describe this diagram completely. What concepts does it illustrate? List all components and their relationships.",
|
|
"photo": "Describe this photo in detail. What objects are visible? What is the scene?",
|
|
"ocr": "Extract ALL text visible in this image. Return it exactly as written, preserving formatting.",
|
|
"chart": "What data does this chart show? List all axes labels, values, and key trends.",
|
|
"document": "Extract all text from this document image. Preserve paragraph structure.",
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Vision model interface
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def analyze_with_model(
|
|
image_url: str,
|
|
prompt: str,
|
|
model_config: dict,
|
|
timeout: float = 120.0,
|
|
) -> dict:
|
|
"""Call a vision model and return structured results.
|
|
|
|
Returns dict with:
|
|
- analysis: str
|
|
- latency_ms: float
|
|
- tokens: dict (prompt_tokens, completion_tokens, total_tokens)
|
|
- success: bool
|
|
- error: str (if failed)
|
|
"""
|
|
import httpx
|
|
|
|
def _load_image_bytes_cached() -> tuple[bytes, str]:
|
|
nonlocal _image_bytes, _mime_type
|
|
if _image_bytes is not None:
|
|
return _image_bytes, _mime_type
|
|
if image_url.startswith(("http://", "https://")):
|
|
with urllib.request.urlopen(image_url, timeout=30) as resp:
|
|
_image_bytes = resp.read()
|
|
_mime_type = resp.headers.get_content_type() or mimetypes.guess_type(image_url)[0] or "image/png"
|
|
else:
|
|
path = Path(image_url).expanduser()
|
|
_image_bytes = path.read_bytes()
|
|
_mime_type = mimetypes.guess_type(str(path))[0] or "image/png"
|
|
return _image_bytes, _mime_type
|
|
|
|
def _data_url() -> str:
|
|
image_bytes, mime_type = _load_image_bytes_cached()
|
|
return f"data:{mime_type};base64,{base64.b64encode(image_bytes).decode()}"
|
|
|
|
def _provider_key(provider: str) -> str:
|
|
if provider == "openrouter":
|
|
return os.getenv("OPENROUTER_API_KEY", "")
|
|
if provider == "nous":
|
|
return os.getenv("NOUS_API_KEY", "") or os.getenv("NOUS_INFERENCE_API_KEY", "")
|
|
if provider == "gemini":
|
|
return os.getenv("GEMINI_API_KEY", "") or os.getenv("GOOGLE_API_KEY", "")
|
|
return os.getenv(f"{provider.upper()}_API_KEY", "")
|
|
|
|
provider = model_config["provider"]
|
|
model_id = model_config["model_id"]
|
|
candidates = [(provider, model_id)]
|
|
if model_config.get("fallback_provider") and model_config.get("fallback_model_id"):
|
|
candidates.append((model_config["fallback_provider"], model_config["fallback_model_id"]))
|
|
|
|
_image_bytes: Optional[bytes] = None
|
|
_mime_type = "image/png"
|
|
failures = []
|
|
|
|
for candidate_provider, candidate_model in candidates:
|
|
api_key = _provider_key(candidate_provider)
|
|
start = time.perf_counter()
|
|
try:
|
|
if candidate_provider in {"openrouter", "nous"}:
|
|
api_url = (
|
|
"https://openrouter.ai/api/v1/chat/completions"
|
|
if candidate_provider == "openrouter"
|
|
else "https://inference.nousresearch.com/v1/chat/completions"
|
|
)
|
|
if not api_key:
|
|
raise RuntimeError(f"No API key for provider {candidate_provider}")
|
|
payload = {
|
|
"model": candidate_model,
|
|
"messages": [{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": prompt},
|
|
{"type": "image_url", "image_url": {"url": _data_url() if not image_url.startswith(("http://", "https://")) else image_url}},
|
|
],
|
|
}],
|
|
"max_tokens": 2000,
|
|
"temperature": 0.1,
|
|
}
|
|
headers = {
|
|
"Authorization": f"Bearer {api_key}",
|
|
"Content-Type": "application/json",
|
|
}
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(api_url, json=payload, headers=headers)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
analysis = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
|
usage = data.get("usage", {})
|
|
tokens = {
|
|
"prompt_tokens": usage.get("prompt_tokens", 0),
|
|
"completion_tokens": usage.get("completion_tokens", 0),
|
|
"total_tokens": usage.get("total_tokens", 0),
|
|
}
|
|
elif candidate_provider == "gemini":
|
|
if not api_key:
|
|
raise RuntimeError("No API key for provider gemini")
|
|
image_bytes, mime_type = _load_image_bytes_cached()
|
|
api_url = f"https://generativelanguage.googleapis.com/v1beta/models/{candidate_model}:generateContent?key={api_key}"
|
|
payload = {
|
|
"contents": [{"parts": [
|
|
{"text": prompt},
|
|
{"inline_data": {"mime_type": mime_type, "data": base64.b64encode(image_bytes).decode()}},
|
|
]}],
|
|
"generationConfig": {"temperature": 0.1, "maxOutputTokens": 2000},
|
|
}
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post(api_url, json=payload)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
parts = data.get("candidates", [{}])[0].get("content", {}).get("parts", [])
|
|
analysis = "\n".join(part.get("text", "") for part in parts if isinstance(part, dict) and part.get("text"))
|
|
usage = data.get("usageMetadata", {})
|
|
tokens = {
|
|
"prompt_tokens": usage.get("promptTokenCount", 0),
|
|
"completion_tokens": usage.get("candidatesTokenCount", 0),
|
|
"total_tokens": usage.get("totalTokenCount", 0),
|
|
}
|
|
elif candidate_provider == "ollama":
|
|
image_bytes, _ = _load_image_bytes_cached()
|
|
payload = {
|
|
"model": candidate_model,
|
|
"stream": False,
|
|
"messages": [{"role": "user", "content": prompt, "images": [base64.b64encode(image_bytes).decode()]}],
|
|
"options": {"temperature": 0.1},
|
|
}
|
|
async with httpx.AsyncClient(timeout=timeout) as client:
|
|
resp = await client.post("http://localhost:11434/api/chat", json=payload)
|
|
resp.raise_for_status()
|
|
data = resp.json()
|
|
analysis = data.get("message", {}).get("content", "")
|
|
tokens = {
|
|
"prompt_tokens": data.get("prompt_eval_count", 0),
|
|
"completion_tokens": data.get("eval_count", 0),
|
|
"total_tokens": (data.get("prompt_eval_count", 0) or 0) + (data.get("eval_count", 0) or 0),
|
|
}
|
|
else:
|
|
raise RuntimeError(f"Unsupported provider {candidate_provider}")
|
|
|
|
latency_ms = (time.perf_counter() - start) * 1000
|
|
return {
|
|
"analysis": analysis,
|
|
"latency_ms": round(latency_ms, 1),
|
|
"tokens": tokens,
|
|
"success": True,
|
|
"error": "",
|
|
"provider_used": candidate_provider,
|
|
"model_used": candidate_model,
|
|
}
|
|
except Exception as e:
|
|
failures.append(f"{candidate_provider}:{candidate_model} => {e}")
|
|
|
|
return {
|
|
"analysis": "",
|
|
"latency_ms": 0,
|
|
"tokens": {},
|
|
"success": False,
|
|
"error": " | ".join(failures) if failures else "No runs",
|
|
"provider_used": candidates[-1][0] if candidates else provider,
|
|
"model_used": candidates[-1][1] if candidates else model_id,
|
|
}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Evaluation metrics
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def compute_ocr_accuracy(extracted: str, ground_truth: str) -> float:
|
|
"""Compute OCR accuracy using character-level Levenshtein ratio.
|
|
|
|
Returns 0.0-1.0 (1.0 = perfect match).
|
|
"""
|
|
if not ground_truth:
|
|
return 1.0 if not extracted else 0.0
|
|
if not extracted:
|
|
return 0.0
|
|
|
|
# Normalized Levenshtein similarity
|
|
extracted_lower = extracted.lower().strip()
|
|
truth_lower = ground_truth.lower().strip()
|
|
|
|
# Simple character overlap ratio (fast proxy)
|
|
max_len = max(len(extracted_lower), len(truth_lower))
|
|
if max_len == 0:
|
|
return 1.0
|
|
|
|
# Count matching characters at matching positions
|
|
matches = sum(1 for a, b in zip(extracted_lower, truth_lower) if a == b)
|
|
position_ratio = matches / max_len
|
|
|
|
# Also check word-level overlap
|
|
extracted_words = set(extracted_lower.split())
|
|
truth_words = set(truth_lower.split())
|
|
if truth_words:
|
|
word_recall = len(extracted_words & truth_words) / len(truth_words)
|
|
else:
|
|
word_recall = 1.0 if not extracted_words else 0.0
|
|
|
|
return round((position_ratio * 0.4 + word_recall * 0.6), 4)
|
|
|
|
|
|
def compute_description_completeness(analysis: str, expected_keywords: list) -> float:
|
|
"""Score description completeness based on keyword coverage.
|
|
|
|
Returns 0.0-1.0.
|
|
"""
|
|
if not expected_keywords:
|
|
return 1.0
|
|
if not analysis:
|
|
return 0.0
|
|
|
|
analysis_lower = analysis.lower()
|
|
found = sum(1 for kw in expected_keywords if kw.lower() in analysis_lower)
|
|
return round(found / len(expected_keywords), 4)
|
|
|
|
|
|
def compute_structural_accuracy(analysis: str, expected_structure: dict) -> dict:
|
|
"""Evaluate structural elements of the analysis.
|
|
|
|
Returns dict with per-element scores.
|
|
"""
|
|
scores = {}
|
|
|
|
# Length check
|
|
min_length = expected_structure.get("min_length", 50)
|
|
scores["length"] = min(len(analysis) / min_length, 1.0) if min_length > 0 else 1.0
|
|
|
|
# Sentence count
|
|
min_sentences = expected_structure.get("min_sentences", 2)
|
|
sentence_count = analysis.count(".") + analysis.count("!") + analysis.count("?")
|
|
scores["sentences"] = min(sentence_count / max(min_sentences, 1), 1.0)
|
|
|
|
# Has specifics (numbers, names, etc.)
|
|
if expected_structure.get("has_numbers", False):
|
|
import re
|
|
scores["has_numbers"] = 1.0 if re.search(r'\d', analysis) else 0.0
|
|
|
|
return scores
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Benchmark runner
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def run_single_test(
|
|
image: dict,
|
|
models: dict,
|
|
runs_per_model: int = 1,
|
|
) -> dict:
|
|
"""Run a single image through all models.
|
|
|
|
Args:
|
|
image: dict with url, category, expected_keywords, ground_truth_ocr, etc.
|
|
models: dict of model configs to test
|
|
runs_per_model: number of runs per model (for consistency testing)
|
|
|
|
Returns dict with results per model.
|
|
"""
|
|
category = image.get("category", "photo")
|
|
prompt = EVAL_PROMPTS.get(category, EVAL_PROMPTS["photo"])
|
|
url = image["url"]
|
|
|
|
results = {}
|
|
|
|
for model_name, model_config in models.items():
|
|
runs = []
|
|
for run_i in range(runs_per_model):
|
|
result = await analyze_with_model(url, prompt, model_config)
|
|
runs.append(result)
|
|
if run_i < runs_per_model - 1:
|
|
await asyncio.sleep(1) # Rate limit courtesy
|
|
|
|
# Aggregate
|
|
successful = [r for r in runs if r["success"]]
|
|
if successful:
|
|
avg_latency = statistics.mean(r["latency_ms"] for r in successful)
|
|
avg_tokens = statistics.mean(
|
|
r["tokens"].get("total_tokens", 0) for r in successful
|
|
)
|
|
# Use first successful run for accuracy metrics
|
|
primary = successful[0]
|
|
|
|
# Compute accuracy
|
|
ocr_score = None
|
|
if image.get("ground_truth_ocr"):
|
|
ocr_score = compute_ocr_accuracy(
|
|
primary["analysis"], image["ground_truth_ocr"]
|
|
)
|
|
|
|
keyword_score = None
|
|
if image.get("expected_keywords"):
|
|
keyword_score = compute_description_completeness(
|
|
primary["analysis"], image["expected_keywords"]
|
|
)
|
|
|
|
structural = compute_structural_accuracy(
|
|
primary["analysis"], image.get("expected_structure", {})
|
|
)
|
|
|
|
results[model_name] = {
|
|
"success": True,
|
|
"analysis_preview": primary["analysis"][:300],
|
|
"analysis_length": len(primary["analysis"]),
|
|
"avg_latency_ms": round(avg_latency, 1),
|
|
"avg_tokens": round(avg_tokens, 1),
|
|
"ocr_accuracy": ocr_score,
|
|
"keyword_completeness": keyword_score,
|
|
"structural_scores": structural,
|
|
"consistency": round(
|
|
statistics.stdev(len(r["analysis"]) for r in successful), 1
|
|
) if len(successful) > 1 else 0.0,
|
|
"runs": len(successful),
|
|
"errors": len(runs) - len(successful),
|
|
}
|
|
else:
|
|
results[model_name] = {
|
|
"success": False,
|
|
"error": runs[0]["error"] if runs else "No runs",
|
|
"runs": 0,
|
|
"errors": len(runs),
|
|
}
|
|
|
|
return results
|
|
|
|
|
|
async def run_benchmark_suite(
|
|
images: List[dict],
|
|
models: dict,
|
|
runs_per_model: int = 1,
|
|
) -> dict:
|
|
"""Run the full benchmark suite.
|
|
|
|
Args:
|
|
images: list of image test cases
|
|
models: model configs to compare
|
|
runs_per_model: consistency runs per image
|
|
|
|
Returns structured benchmark report.
|
|
"""
|
|
total = len(images)
|
|
all_results = []
|
|
|
|
print(f"\nRunning vision benchmark: {total} images x {len(models)} models x {runs_per_model} runs")
|
|
print(f"Models: {', '.join(m['display_name'] for m in models.values())}\n")
|
|
|
|
for i, image in enumerate(images):
|
|
img_id = image.get("id", f"img_{i}")
|
|
category = image.get("category", "unknown")
|
|
print(f" [{i+1}/{total}] {img_id} ({category})...", end=" ", flush=True)
|
|
|
|
result = await run_single_test(image, models, runs_per_model)
|
|
result["image_id"] = img_id
|
|
result["category"] = category
|
|
all_results.append(result)
|
|
|
|
# Quick status
|
|
statuses = []
|
|
for mname in models:
|
|
if result[mname]["success"]:
|
|
lat = result[mname]["avg_latency_ms"]
|
|
statuses.append(f"{mname}:{lat:.0f}ms")
|
|
else:
|
|
statuses.append(f"{mname}:FAIL")
|
|
print(", ".join(statuses))
|
|
|
|
# Aggregate statistics
|
|
summary = aggregate_results(all_results, models)
|
|
|
|
return {
|
|
"generated_at": datetime.now(timezone.utc).isoformat(),
|
|
"config": {
|
|
"total_images": total,
|
|
"runs_per_model": runs_per_model,
|
|
"models": {k: v["display_name"] for k, v in models.items()},
|
|
},
|
|
"results": all_results,
|
|
"summary": summary,
|
|
}
|
|
|
|
|
|
def aggregate_results(results: List[dict], models: dict) -> dict:
|
|
"""Compute aggregate statistics across all test images."""
|
|
summary = {}
|
|
|
|
for model_name in models:
|
|
model_results = [r[model_name] for r in results if r[model_name]["success"]]
|
|
failed = [r[model_name] for r in results if not r[model_name]["success"]]
|
|
|
|
if not model_results:
|
|
summary[model_name] = {
|
|
"success_rate": 0,
|
|
"error": "All runs failed",
|
|
"total_runs": 0,
|
|
"total_failures": len(failed),
|
|
"failure_examples": sorted({f.get("error", "unknown failure") for f in failed})[:3],
|
|
}
|
|
continue
|
|
|
|
latencies = [r["avg_latency_ms"] for r in model_results]
|
|
tokens = [r["avg_tokens"] for r in model_results if r.get("avg_tokens")]
|
|
ocr_scores = [r["ocr_accuracy"] for r in model_results if r.get("ocr_accuracy") is not None]
|
|
keyword_scores = [r["keyword_completeness"] for r in model_results if r.get("keyword_completeness") is not None]
|
|
|
|
summary[model_name] = {
|
|
"success_rate": round(len(model_results) / (len(model_results) + len(failed)), 4),
|
|
"total_runs": len(model_results),
|
|
"total_failures": len(failed),
|
|
"failure_examples": sorted({f.get("error", "unknown failure") for f in failed})[:3],
|
|
"latency": {
|
|
"mean_ms": round(statistics.mean(latencies), 1),
|
|
"median_ms": round(statistics.median(latencies), 1),
|
|
"p95_ms": round(sorted(latencies)[int(len(latencies) * 0.95)], 1),
|
|
"std_ms": round(statistics.stdev(latencies), 1) if len(latencies) > 1 else 0,
|
|
},
|
|
"tokens": {
|
|
"mean_total": round(statistics.mean(tokens), 1) if tokens else 0,
|
|
"total_used": sum(int(t) for t in tokens),
|
|
},
|
|
"accuracy": {
|
|
"ocr_mean": round(statistics.mean(ocr_scores), 4) if ocr_scores else None,
|
|
"ocr_count": len(ocr_scores),
|
|
"keyword_mean": round(statistics.mean(keyword_scores), 4) if keyword_scores else None,
|
|
"keyword_count": len(keyword_scores),
|
|
},
|
|
}
|
|
|
|
return summary
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Report generation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def to_markdown(report: dict) -> str:
|
|
"""Generate human-readable markdown report."""
|
|
summary = report["summary"]
|
|
config = report["config"]
|
|
model_names = list(config["models"].values())
|
|
|
|
lines = [
|
|
"# Vision Benchmark Report",
|
|
"",
|
|
f"Generated: {report['generated_at'][:16]}",
|
|
f"Images tested: {config['total_images']}",
|
|
f"Runs per model: {config['runs_per_model']}",
|
|
f"Models: {', '.join(model_names)}",
|
|
"",
|
|
"## Latency Comparison",
|
|
"",
|
|
"| Model | Mean (ms) | Median | P95 | Std Dev |",
|
|
"|-------|-----------|--------|-----|---------|",
|
|
]
|
|
|
|
for mkey, mname in config["models"].items():
|
|
if mkey in summary and "latency" in summary[mkey]:
|
|
lat = summary[mkey]["latency"]
|
|
lines.append(
|
|
f"| {mname} | {lat['mean_ms']:.0f} | {lat['median_ms']:.0f} | "
|
|
f"{lat['p95_ms']:.0f} | {lat['std_ms']:.0f} |"
|
|
)
|
|
|
|
lines += [
|
|
"",
|
|
"## Accuracy Comparison",
|
|
"",
|
|
"| Model | OCR Accuracy | Keyword Coverage | Success Rate |",
|
|
"|-------|-------------|-----------------|--------------|",
|
|
]
|
|
|
|
for mkey, mname in config["models"].items():
|
|
if mkey in summary and "accuracy" in summary[mkey]:
|
|
acc = summary[mkey]["accuracy"]
|
|
sr = summary[mkey].get("success_rate", 0)
|
|
ocr = f"{acc['ocr_mean']:.1%}" if acc["ocr_mean"] is not None else "N/A"
|
|
kw = f"{acc['keyword_mean']:.1%}" if acc["keyword_mean"] is not None else "N/A"
|
|
lines.append(f"| {mname} | {ocr} | {kw} | {sr:.1%} |")
|
|
|
|
lines += [
|
|
"",
|
|
"## Token Usage",
|
|
"",
|
|
"| Model | Mean Tokens/Image | Total Tokens |",
|
|
"|-------|------------------|--------------|",
|
|
]
|
|
|
|
for mkey, mname in config["models"].items():
|
|
if mkey in summary and "tokens" in summary[mkey]:
|
|
tok = summary[mkey]["tokens"]
|
|
lines.append(
|
|
f"| {mname} | {tok['mean_total']:.0f} | {tok['total_used']} |"
|
|
)
|
|
|
|
lines += ["", "## Failure Modes", ""]
|
|
had_failures = False
|
|
for mkey, mname in config["models"].items():
|
|
model_summary = summary.get(mkey, {})
|
|
failure_examples = model_summary.get("failure_examples", [])
|
|
if not failure_examples and not model_summary.get("error"):
|
|
continue
|
|
had_failures = True
|
|
lines.append(f"### {mname}")
|
|
if model_summary.get("error"):
|
|
lines.append(f"- Summary: {model_summary['error']}")
|
|
for err in failure_examples:
|
|
lines.append(f"- {err}")
|
|
lines.append("")
|
|
if not had_failures:
|
|
lines.append("- No provider/runtime failures recorded.")
|
|
|
|
# Verdict
|
|
lines += ["", "## Verdict", ""]
|
|
|
|
# Find best model by composite score
|
|
best_model = None
|
|
best_score = -1
|
|
for mkey, mname in config["models"].items():
|
|
if mkey not in summary or "accuracy" not in summary[mkey]:
|
|
continue
|
|
acc = summary[mkey]["accuracy"]
|
|
sr = summary[mkey].get("success_rate", 0)
|
|
ocr = acc["ocr_mean"] or 0
|
|
kw = acc["keyword_mean"] or 0
|
|
# Weighted composite: 40% OCR, 30% keyword, 30% success rate
|
|
score = (ocr * 0.4 + kw * 0.3 + sr * 0.3)
|
|
if score > best_score:
|
|
best_score = score
|
|
best_model = mname
|
|
|
|
if best_model:
|
|
lines.append(f"**Best overall: {best_model}** (composite score: {best_score:.1%})")
|
|
lines.append("")
|
|
lines.append("Recommendation: keep the best-performing Gemma/Gemini lane from this run and only switch if repeated runs disagree.")
|
|
else:
|
|
lines.append("Benchmark blocked or insufficient data for a trustworthy winner.")
|
|
lines.append("")
|
|
lines.append("Recommendation: repair provider/runtime availability, rerun the benchmark, and keep the current implementation unchanged until comparative results exist.")
|
|
|
|
return "\n".join(lines)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Test dataset management
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def generate_sample_dataset() -> List[dict]:
|
|
"""Generate a larger benchmark dataset aligned with issue #817.
|
|
|
|
Returns 50+ images across screenshots, diagrams, photos, OCR, charts,
|
|
and document-like images so the harness matches the issue contract.
|
|
"""
|
|
dataset: List[dict] = []
|
|
|
|
screenshots = [
|
|
("github_mark", "https://github.githubassets.com/images/modules/logos_page/GitHub-Mark.png", ["github", "logo", "mark"]),
|
|
("github_social", "https://github.githubassets.com/images/modules/site/social-cards.png", ["github", "page", "web"]),
|
|
("github_code_search", "https://github.githubassets.com/images/modules/site/features-code-search.png", ["search", "code", "feature"]),
|
|
("terminal_capture", "https://raw.githubusercontent.com/nicehash/nicehash-quick-start/main/images/nicehash-terminal.png", ["terminal", "command", "output"]),
|
|
("http_404", "https://http.cat/404.jpg", ["404", "error", "cat"]),
|
|
("dummy_cli_01", "https://dummyimage.com/1280x720/111827/f9fafb.png&text=Hermes+CLI+Session+01", ["hermes", "cli", "session"]),
|
|
("dummy_cli_02", "https://dummyimage.com/1280x720/0f172a/e2e8f0.png&text=Prompt+Cache+Dashboard", ["prompt", "cache", "dashboard"]),
|
|
("dummy_ui_01", "https://dummyimage.com/1280x720/1f2937/f3f4f6.png&text=Settings+Panel+Voice+Mode", ["settings", "voice", "mode"]),
|
|
("dummy_ui_02", "https://dummyimage.com/1280x720/334155/f8fafc.png&text=Browser+Vision+Preview", ["browser", "vision", "preview"]),
|
|
("dummy_ui_03", "https://dummyimage.com/1280x720/111827/ffffff.png&text=Tool+Call+Inspector", ["tool", "call", "inspector"]),
|
|
]
|
|
for ident, url, keywords in screenshots:
|
|
dataset.append({
|
|
"id": f"screenshot_{ident}",
|
|
"url": url,
|
|
"category": "screenshot",
|
|
"expected_keywords": keywords,
|
|
"ground_truth_ocr": "",
|
|
"expected_structure": {"min_length": 30, "min_sentences": 1, "has_numbers": False},
|
|
})
|
|
|
|
diagrams = [
|
|
("flow_a", "https://dummyimage.com/1200x800/f8fafc/0f172a.png&text=Flowchart+API+Gateway+Queue+Worker", ["flowchart", "api", "worker"]),
|
|
("flow_b", "https://dummyimage.com/1200x800/f1f5f9/0f172a.png&text=Architecture+Diagram+Database+Cache+Client", ["architecture", "diagram", "cache"]),
|
|
("uml_a", "https://dummyimage.com/1200x800/e2e8f0/0f172a.png&text=Class+Diagram+User+Session+Message", ["class", "diagram", "session"]),
|
|
("uml_b", "https://dummyimage.com/1200x800/cbd5e1/0f172a.png&text=Sequence+Diagram+Request+Response", ["sequence", "diagram", "response"]),
|
|
("network_a", "https://dummyimage.com/1200x800/ffffff/111827.png&text=Network+Nodes+Edges+Router", ["network", "node", "router"]),
|
|
("network_b", "https://dummyimage.com/1200x800/ffffff/1e293b.png&text=Service+Mesh+Proxy+Auth", ["service", "mesh", "auth"]),
|
|
("state_machine", "https://dummyimage.com/1200x800/f8fafc/334155.png&text=State+Machine+Idle+Run+Stop", ["state", "machine", "idle"]),
|
|
("mind_map", "https://dummyimage.com/1200x800/fefce8/1f2937.png&text=Mind+Map+Memory+Recall+Tools", ["mind", "memory", "tools"]),
|
|
("pipeline", "https://dummyimage.com/1200x800/ecfeff/155e75.png&text=Pipeline+Ingest+Rank+Summarize", ["pipeline", "ingest", "summarize"]),
|
|
("org_chart", "https://dummyimage.com/1200x800/fdf2f8/831843.png&text=Org+Chart+Lead+Review+Ops", ["org", "chart", "review"]),
|
|
]
|
|
for ident, url, keywords in diagrams:
|
|
dataset.append({
|
|
"id": f"diagram_{ident}",
|
|
"url": url,
|
|
"category": "diagram",
|
|
"expected_keywords": keywords,
|
|
"ground_truth_ocr": "",
|
|
"expected_structure": {"min_length": 50, "min_sentences": 2, "has_numbers": False},
|
|
})
|
|
|
|
for idx in range(1, 11):
|
|
dataset.append({
|
|
"id": f"photo_random_{idx:02d}",
|
|
"url": f"https://picsum.photos/seed/vision-bench-{idx}/640/480",
|
|
"category": "photo",
|
|
"expected_keywords": [],
|
|
"ground_truth_ocr": "",
|
|
"expected_structure": {"min_length": 30, "min_sentences": 1, "has_numbers": False},
|
|
})
|
|
|
|
charts = [
|
|
("bar_quarterly", "https://quickchart.io/chart?c={type:'bar',data:{labels:['Q1','Q2','Q3','Q4'],datasets:[{label:'Revenue',data:[100,150,200,250]}]}}", ["bar", "chart", "revenue"]),
|
|
("pie_market", "https://quickchart.io/chart?c={type:'pie',data:{labels:['A','B','C'],datasets:[{data:[30,50,20]}]}}", ["pie", "chart", "percentage"]),
|
|
("line_temp", "https://quickchart.io/chart?c={type:'line',data:{labels:['Jan','Feb','Mar','Apr'],datasets:[{label:'Temperature',data:[5,8,12,18]}]}}", ["line", "chart", "temperature"]),
|
|
("radar_skill", "https://quickchart.io/chart?c={type:'radar',data:{labels:['Speed','Power','Defense','Magic'],datasets:[{label:'Hero',data:[80,60,70,90]}]}}", ["radar", "chart", "skill"]),
|
|
("stacked_cloud", "https://quickchart.io/chart?c={type:'bar',data:{labels:['2022','2023','2024'],datasets:[{label:'Cloud',data:[100,150,200]},{label:'On-prem',data:[200,180,160]}]},options:{scales:{x:{stacked:true},y:{stacked:true}}}}", ["stacked", "bar", "chart"]),
|
|
("area_growth", "https://quickchart.io/chart?c={type:'line',data:{labels:['W1','W2','W3','W4'],datasets:[{label:'Growth',data:[10,15,18,24],fill:true}]}}", ["line", "growth", "chart"]),
|
|
("scatter_eval", "https://quickchart.io/chart?c={type:'scatter',data:{datasets:[{label:'Runs',data:[{x:1,y:70},{x:2,y:75},{x:3,y:82}]}]}}", ["scatter", "chart", "runs"]),
|
|
("horizontal_bar", "https://quickchart.io/chart?c={type:'bar',data:{labels:['UI','OCR','Docs'],datasets:[{label:'Score',data:[88,76,91]}]},options:{indexAxis:'y'}}", ["bar", "score", "ocr"]),
|
|
("bubble_usage", "https://quickchart.io/chart?c={type:'bubble',data:{datasets:[{label:'Latency',data:[{x:1,y:120,r:8},{x:2,y:95,r:6},{x:3,y:180,r:10}]}]}}", ["bubble", "latency", "chart"]),
|
|
("doughnut_devices", "https://quickchart.io/chart?c={type:'doughnut',data:{labels:['Desktop','Mobile','Tablet'],datasets:[{data:[60,30,10]}]}}", ["doughnut", "chart", "device"]),
|
|
]
|
|
for ident, url, keywords in charts:
|
|
dataset.append({
|
|
"id": f"chart_{ident}",
|
|
"url": url,
|
|
"category": "chart",
|
|
"expected_keywords": keywords,
|
|
"ground_truth_ocr": "",
|
|
"expected_structure": {"min_length": 50, "min_sentences": 2, "has_numbers": True},
|
|
})
|
|
|
|
ocr_texts = [
|
|
"Hermes OCR Alpha 01",
|
|
"Prompt Cache Hit 87%",
|
|
"Session 42 Ready",
|
|
"Latency 118 ms",
|
|
"Voice Mode Enabled",
|
|
]
|
|
for idx, text in enumerate(ocr_texts, start=1):
|
|
dataset.append({
|
|
"id": f"ocr_text_{idx:02d}",
|
|
"url": f"https://dummyimage.com/1200x320/ffffff/000000.png&text={text.replace(' ', '+')}",
|
|
"category": "ocr",
|
|
"expected_keywords": text.lower().split()[:2],
|
|
"ground_truth_ocr": text,
|
|
"expected_structure": {"min_length": 10, "min_sentences": 1, "has_numbers": any(ch.isdigit() for ch in text)},
|
|
})
|
|
|
|
documents = [
|
|
"Invoice 1001 Total 42 Due 2026-04-22",
|
|
"Form A Name Alice Status Approved",
|
|
"Report Memory Recall Score 91 Percent",
|
|
"Checklist Crisis Escalation Call 988 Now",
|
|
"Meeting Notes Vision Benchmark Run Pending",
|
|
]
|
|
for idx, text in enumerate(documents, start=1):
|
|
dataset.append({
|
|
"id": f"document_text_{idx:02d}",
|
|
"url": f"https://dummyimage.com/1400x900/f8fafc/0f172a.png&text={text.replace(' ', '+')}",
|
|
"category": "document",
|
|
"expected_keywords": text.lower().split()[:3],
|
|
"ground_truth_ocr": text,
|
|
"expected_structure": {"min_length": 20, "min_sentences": 1, "has_numbers": any(ch.isdigit() for ch in text)},
|
|
})
|
|
|
|
return dataset
|
|
|
|
|
|
def load_dataset(path: str) -> List[dict]:
|
|
"""Load test dataset from JSON file."""
|
|
with open(path) as f:
|
|
return json.load(f)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# CLI
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
async def main():
|
|
parser = argparse.ArgumentParser(description="Vision Benchmark Suite (Issue #817)")
|
|
parser.add_argument("--images", help="Path to test images JSON file")
|
|
parser.add_argument("--url", help="Single image URL to test")
|
|
parser.add_argument("--category", default="photo", help="Category for single URL")
|
|
parser.add_argument("--output", default=None, help="Output JSON file")
|
|
parser.add_argument("--markdown-output", default=None, help="Optional markdown report output path")
|
|
parser.add_argument("--runs", type=int, default=1, help="Runs per model per image")
|
|
parser.add_argument("--limit", type=int, default=0, help="Limit to the first N images for smoke runs")
|
|
parser.add_argument("--models", nargs="+", default=None,
|
|
help="Models to test (default: all)")
|
|
parser.add_argument("--markdown", action="store_true", help="Output markdown report")
|
|
parser.add_argument("--generate-dataset", action="store_true",
|
|
help="Generate sample dataset and exit")
|
|
args = parser.parse_args()
|
|
|
|
if args.generate_dataset:
|
|
dataset = generate_sample_dataset()
|
|
out_path = args.images or "benchmarks/test_images.json"
|
|
os.makedirs(os.path.dirname(out_path) or ".", exist_ok=True)
|
|
with open(out_path, "w") as f:
|
|
json.dump(dataset, f, indent=2)
|
|
print(f"Generated sample dataset: {out_path} ({len(dataset)} images)")
|
|
return
|
|
|
|
# Select models
|
|
if args.models:
|
|
selected = {k: v for k, v in MODELS.items() if k in args.models}
|
|
else:
|
|
selected = MODELS
|
|
|
|
# Load images
|
|
if args.url:
|
|
images = [{"id": "single", "url": args.url, "category": args.category}]
|
|
elif args.images:
|
|
images = load_dataset(args.images)
|
|
else:
|
|
print("ERROR: Provide --images or --url")
|
|
sys.exit(1)
|
|
|
|
if args.limit and args.limit > 0:
|
|
images = images[:args.limit]
|
|
|
|
# Run benchmark
|
|
report = await run_benchmark_suite(images, selected, args.runs)
|
|
|
|
markdown_report = to_markdown(report)
|
|
|
|
# Output
|
|
if args.output:
|
|
os.makedirs(os.path.dirname(args.output) or ".", exist_ok=True)
|
|
with open(args.output, "w") as f:
|
|
json.dump(report, f, indent=2)
|
|
print(f"\nResults saved to {args.output}")
|
|
|
|
if args.markdown_output:
|
|
os.makedirs(os.path.dirname(args.markdown_output) or ".", exist_ok=True)
|
|
with open(args.markdown_output, "w", encoding="utf-8") as f:
|
|
f.write(markdown_report)
|
|
print(f"Markdown report saved to {args.markdown_output}")
|
|
|
|
if args.markdown or not args.output:
|
|
print("\n" + markdown_report)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
asyncio.run(main())
|