Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
fa81831cd2 fix: local test images for reliable vision benchmark (#868)
Some checks are pending
Contributor Attribution Check / check-attribution (pull_request) Waiting to run
Docker Build and Publish / build-and-push (pull_request) Waiting to run
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Waiting to run
Tests / test (pull_request) Waiting to run
Tests / e2e (pull_request) Waiting to run
Vision benchmark used external URLs that may become unavailable,
causing flaky CI runs.

New benchmarks/test_images.json:
- 5 test images with local paths, descriptions, expected answers
- Categories: shape_color, ocr, counting

New benchmarks/test_images/:
- 5 generated PNG test images (red_circle, blue_square,
  green_triangle, text_hello, mixed_shapes)
- Deterministic, always available, ~1-3KB each

New benchmarks/vision_benchmark.py:
- load_test_dataset(): loads test_images.json
- verify_images_exist(): checks all images present
- run_vision_test(): single test with base64 image encoding
- evaluate_response(): checks expected keywords in response
- run_benchmark(): full benchmark suite
- format_report(): human-readable results
- --model, --base-url, --json flags

Closes #868
2026-04-15 23:36:58 -04:00
8 changed files with 246 additions and 313 deletions

View File

@@ -0,0 +1,42 @@
[
{
"id": "img_001",
"name": "red_circle",
"path": "benchmarks/test_images/red_circle.png",
"description": "A red circle on a white background",
"expected_answer_contains": ["red", "circle"],
"category": "shape_color"
},
{
"id": "img_002",
"name": "blue_square",
"path": "benchmarks/test_images/blue_square.png",
"description": "A blue square on a white background",
"expected_answer_contains": ["blue", "square"],
"category": "shape_color"
},
{
"id": "img_003",
"name": "green_triangle",
"path": "benchmarks/test_images/green_triangle.png",
"description": "A green triangle on a white background",
"expected_answer_contains": ["green", "triangle"],
"category": "shape_color"
},
{
"id": "img_004",
"name": "text_hello",
"path": "benchmarks/test_images/text_hello.png",
"description": "An image containing the text 'Hello World'",
"expected_answer_contains": ["hello", "world"],
"category": "ocr"
},
{
"id": "img_005",
"name": "mixed_shapes",
"path": "benchmarks/test_images/mixed_shapes.png",
"description": "Multiple colored shapes: red circle, blue square, yellow star",
"expected_answer_contains": ["red", "blue", "yellow"],
"category": "counting"
}
]

Binary file not shown.

After

Width:  |  Height:  |  Size: 779 B

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.3 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.2 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 KiB

Binary file not shown.

After

Width:  |  Height:  |  Size: 3.3 KiB

View File

@@ -0,0 +1,204 @@
#!/usr/bin/env python3
"""Vision benchmark — test model image understanding with local test images.
Uses locally-stored test images (not external URLs) for reliable CI.
Usage:
python3 benchmarks/vision_benchmark.py --model hermes3
python3 benchmarks/vision_benchmark.py --model qwen2.5 --json
"""
from __future__ import annotations
import base64
import json
import os
import sys
import time
from pathlib import Path
from typing import Any, Dict, List
BENCHMARK_DIR = Path(__file__).resolve().parent
TEST_IMAGES_FILE = BENCHMARK_DIR / "test_images.json"
def load_test_dataset() -> List[Dict[str, Any]]:
"""Load test image dataset."""
if not TEST_IMAGES_FILE.exists():
raise FileNotFoundError(f"Test dataset not found: {TEST_IMAGES_FILE}")
with open(TEST_IMAGES_FILE) as f:
return json.load(f)
def encode_image_base64(image_path: str) -> str:
"""Encode image as base64 for API call."""
with open(image_path, "rb") as f:
return base64.b64encode(f.read()).decode()
def verify_images_exist(dataset: List[Dict[str, Any]]) -> List[str]:
"""Verify all test images exist locally."""
missing = []
for item in dataset:
path = BENCHMARK_DIR.parent / item["path"]
if not path.exists():
missing.append(item["path"])
return missing
def run_vision_test(
image_path: str,
prompt: str,
base_url: str = "http://localhost:11434/v1",
model: str = "",
api_key: str = "",
timeout: int = 30,
) -> Dict[str, Any]:
"""Run a single vision test against a model."""
import urllib.request
img_b64 = encode_image_base64(image_path)
messages = [
{
"role": "user",
"content": [
{"type": "text", "text": prompt},
{
"type": "image_url",
"image_url": {"url": f"data:image/png;base64,{img_b64}"},
},
],
}
]
body = {
"model": model or "",
"messages": messages,
"max_tokens": 200,
}
headers = {"Content-Type": "application/json"}
if api_key:
headers["Authorization"] = f"Bearer {api_key}"
url = f"{base_url.rstrip('/')}/chat/completions"
t0 = time.monotonic()
try:
req = urllib.request.Request(url, data=json.dumps(body).encode(), headers=headers, method="POST")
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read())
elapsed = time.monotonic() - t0
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
return {
"success": True,
"response": content,
"latency_ms": int(elapsed * 1000),
"model": data.get("model", model),
}
except Exception as e:
return {
"success": False,
"response": "",
"latency_ms": int((time.monotonic() - t0) * 1000),
"error": str(e),
}
def evaluate_response(response: str, expected: List[str]) -> bool:
"""Check if response contains expected keywords."""
response_lower = response.lower()
return all(kw.lower() in response_lower for kw in expected)
def run_benchmark(
base_url: str = "http://localhost:11434/v1",
model: str = "",
) -> Dict[str, Any]:
"""Run full vision benchmark."""
dataset = load_test_dataset()
# Verify images exist
missing = verify_images_exist(dataset)
if missing:
return {"error": f"Missing test images: {missing}", "passed": 0, "total": len(dataset)}
results = []
passed = 0
for item in dataset:
image_path = str(BENCHMARK_DIR.parent / item["path"])
prompt = f"What do you see in this image? Describe the shapes and colors."
result = run_vision_test(image_path, prompt, base_url=base_url, model=model)
result["test_id"] = item["id"]
result["test_name"] = item["name"]
result["category"] = item["category"]
if result["success"]:
result["correct"] = evaluate_response(result["response"], item["expected_answer_contains"])
if result["correct"]:
passed += 1
else:
result["correct"] = False
results.append(result)
return {
"model": model,
"base_url": base_url,
"passed": passed,
"total": len(dataset),
"success_rate": passed / len(dataset) if dataset else 0,
"results": results,
}
def format_report(benchmark: Dict[str, Any]) -> str:
"""Format benchmark results."""
if "error" in benchmark:
return f"ERROR: {benchmark['error']}"
lines = [
"Vision Benchmark Results",
"=" * 40,
f"Model: {benchmark.get('model', 'unknown')}",
f"Passed: {benchmark['passed']}/{benchmark['total']} ({benchmark['success_rate']:.0%})",
"",
]
for r in benchmark.get("results", []):
icon = "\u2705" if r.get("correct") else "\u274c"
name = r.get("test_name", "?")
cat = r.get("category", "?")
lat = r.get("latency_ms", 0)
lines.append(f" {icon} {name} ({cat}) — {lat}ms")
if not r.get("success"):
lines.append(f" Error: {r.get('error', 'unknown')}")
elif not r.get("correct"):
lines.append(f" Got: {r.get('response', '')[:100]}")
return "\n".join(lines)
def main():
import argparse
parser = argparse.ArgumentParser(description="Vision benchmark")
parser.add_argument("--base-url", default="http://localhost:11434/v1")
parser.add_argument("--model", default="")
parser.add_argument("--json", action="store_true")
args = parser.parse_args()
benchmark = run_benchmark(base_url=args.base_url, model=args.model)
if args.json:
print(json.dumps(benchmark, indent=2))
else:
print(format_report(benchmark))
return 0 if benchmark.get("success_rate", 0) >= 0.8 else 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -1,313 +0,0 @@
"""Context-Aware Risk Scoring — ML-lite tier detection enhancement.
Enhances the existing approval.py dangerous-command detection with
context-aware risk scoring. Instead of pure pattern matching, considers:
1. Path context: rm /tmp/x is safer than rm /etc/passwd
2. Command context: chmod 777 on project dir vs system dir
3. Recency: repeated dangerous commands increase risk
4. Scope: commands affecting more files = higher risk
Usage:
from tools.risk_scorer import score_action, RiskResult
result = score_action("rm -rf /tmp/build")
# result.tier = MEDIUM (not HIGH, because /tmp is safe)
# result.confidence = 0.7
"""
import os
import re
import time
from dataclasses import dataclass, field
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
# Risk tiers (aligned with approval_tiers.py)
class RiskTier(IntEnum):
SAFE = 0
LOW = 1
MEDIUM = 2
HIGH = 3
CRITICAL = 4
@dataclass
class RiskResult:
"""Result of risk scoring."""
tier: RiskTier
confidence: float # 0.0 to 1.0
reasons: List[str] = field(default_factory=list)
context_factors: Dict[str, Any] = field(default_factory=dict)
# --- Path risk assessment ---
SAFE_PATHS = {
"/tmp", "/var/tmp", "/dev/shm",
"~/.hermes/sessions", "~/.hermes/cache", "~/.hermes/logs",
"/tmp/", "/var/tmp/",
}
HIGH_RISK_PATHS = {
"/etc", "/boot", "/usr/lib", "/usr/bin",
"~/.ssh", "~/.gnupg",
"/var/lib", "/opt",
}
CRITICAL_PATHS = {
"/", "/etc/passwd", "/etc/shadow", "/etc/sudoers",
"~/.ssh/id_rsa", "~/.ssh/authorized_keys",
"/boot/vmlinuz", "/dev/sda", "/dev/nvme",
}
def _extract_paths(command: str) -> List[str]:
"""Extract file paths from a command."""
paths = []
# Match common path patterns
for match in re.finditer(r'[/~][\w/.~-]+', command):
paths.append(match.group())
# Also match $HOME, $HERMES_HOME expansions
for match in re.finditer(r'\$(?:HOME|HERMES_HOME|PWD)[/\w]*', command):
paths.append(match.group())
return paths
def _classify_path(path: str) -> str:
"""Classify a path as safe, high-risk, or critical."""
path_lower = path.lower().replace("\\", "/")
for critical in CRITICAL_PATHS:
if path_lower.startswith(critical.lower()):
return "critical"
for high in HIGH_RISK_PATHS:
if path_lower.startswith(high.lower()):
return "high"
for safe in SAFE_PATHS:
if path_lower.startswith(safe.lower()):
return "safe"
# Unknown paths default to medium
return "unknown"
# --- Command risk modifiers ---
RISK_MODIFIERS = {
# Flags that increase risk
"-rf": 1.5,
"-r": 1.2,
"--force": 1.5,
"--recursive": 1.2,
"--no-preserve-root": 3.0,
"-f": 1.3,
"--hard": 1.5,
"--force-push": 2.0,
"-D": 1.4,
# Flags that decrease risk
"--dry-run": 0.1,
"-n": 0.3,
"--no-act": 0.1,
"--interactive": 0.7,
"-i": 0.7,
}
def _get_command_risk_modifier(command: str) -> float:
"""Get risk modifier based on command flags."""
modifier = 1.0
for flag, mod in RISK_MODIFIERS.items():
if flag in command:
modifier *= mod
return modifier
# --- Scope assessment ---
def _assess_scope(command: str) -> float:
"""Assess the scope of a command (how many files/systems affected)."""
scope = 1.0
# Wildcards increase scope
if "*" in command or "?" in command:
scope *= 2.0
# Recursive operations increase scope
if re.search(r'-r[f]?\b', command):
scope *= 1.5
# find/xargs pipelines increase scope
if "find" in command and ("exec" in command or "xargs" in command):
scope *= 2.0
# Multiple targets increase scope
paths = _extract_paths(command)
if len(paths) > 2:
scope *= 1.3
return min(scope, 5.0) # Cap at 5x
# --- Recent command tracking ---
_recent_commands: List[Tuple[float, str]] = []
_TRACK_WINDOW = 300 # 5 minutes
def _track_command(command: str) -> float:
"""Track command and return escalation factor based on recency."""
now = time.time()
# Clean old entries
global _recent_commands
_recent_commands = [
(ts, cmd) for ts, cmd in _recent_commands
if now - ts < _TRACK_WINDOW
]
# Check for repeated dangerous patterns
escalation = 1.0
for ts, recent_cmd in _recent_commands:
# Same command repeated = escalating risk
if recent_cmd == command:
escalation += 0.2
# Similar commands = moderate escalation
elif _commands_similar(command, recent_cmd):
escalation += 0.1
_recent_commands.append((now, command))
return min(escalation, 3.0) # Cap at 3x
def _commands_similar(cmd1: str, cmd2: str) -> bool:
"""Check if two commands are structurally similar."""
# Extract command name
name1 = cmd1.split()[0] if cmd1.split() else ""
name2 = cmd2.split()[0] if cmd2.split() else ""
return name1 == name2
# --- Main scoring function ---
# Base tier mapping from command name
COMMAND_BASE_TIERS = {
"rm": RiskTier.HIGH,
"chmod": RiskTier.MEDIUM,
"chown": RiskTier.HIGH,
"mkfs": RiskTier.CRITICAL,
"dd": RiskTier.HIGH,
"kill": RiskTier.HIGH,
"pkill": RiskTier.HIGH,
"systemctl": RiskTier.HIGH,
"git": RiskTier.LOW,
"sed": RiskTier.LOW,
"cp": RiskTier.LOW,
"mv": RiskTier.LOW,
"python3": RiskTier.LOW,
"pip": RiskTier.LOW,
"npm": RiskTier.LOW,
"docker": RiskTier.MEDIUM,
"ansible": RiskTier.HIGH,
}
def score_action(action: str, context: Optional[Dict[str, Any]] = None) -> RiskResult:
"""Score an action's risk level with context awareness.
Considers:
- Command base risk
- Path context (safe vs critical paths)
- Command flags (force, recursive, dry-run)
- Scope (wildcards, multiple targets)
- Recency (repeated commands escalate)
Returns:
RiskResult with tier, confidence, and reasons.
"""
if not action or not isinstance(action, str):
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty input"])
parts = action.strip().split()
if not parts:
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty command"])
cmd_name = parts[0].split("/")[-1] # Extract command name
# Base tier from command name
base_tier = COMMAND_BASE_TIERS.get(cmd_name, RiskTier.SAFE)
# Path risk assessment
paths = _extract_paths(action)
max_path_risk = "safe"
for path in paths:
path_risk = _classify_path(path)
risk_order = {"safe": 0, "unknown": 1, "high": 2, "critical": 3}
if risk_order.get(path_risk, 0) > risk_order.get(max_path_risk, 0):
max_path_risk = path_risk
# Calculate final tier
reasons = []
# Path-based tier adjustment
if max_path_risk == "critical":
base_tier = RiskTier.CRITICAL
reasons.append(f"Critical path detected: {paths[0] if paths else 'unknown'}")
elif max_path_risk == "high":
if base_tier.value < RiskTier.HIGH.value:
base_tier = RiskTier.HIGH
reasons.append(f"High-risk path: {paths[0] if paths else 'unknown'}")
elif max_path_risk == "safe":
# Downgrade if all paths are safe
if base_tier.value > RiskTier.MEDIUM.value:
base_tier = RiskTier.MEDIUM
reasons.append("Safe path context — risk downgraded")
# Apply modifiers
modifier = _get_command_risk_modifier(action)
scope = _assess_scope(action)
recency = _track_command(action)
# Check for dry-run (overrides everything)
if "--dry-run" in action or "-n " in action:
return RiskResult(
tier=RiskTier.SAFE,
confidence=0.95,
reasons=["dry-run mode — no actual changes"],
context_factors={"dry_run": True},
)
# Calculate confidence
confidence = 0.8 # Base confidence
if max_path_risk == "safe":
confidence = 0.9
elif max_path_risk == "unknown":
confidence = 0.6
elif max_path_risk == "critical":
confidence = 0.95
# Reasons
if modifier > 1.5:
reasons.append(f"Force/recursive flags (modifier: {modifier:.1f}x)")
if scope > 1.5:
reasons.append(f"Wide scope (wildcards/multiple targets, {scope:.1f}x)")
if recency > 1.2:
reasons.append(f"Repeated command pattern ({recency:.1f}x escalation)")
if not reasons:
reasons.append(f"Command '{cmd_name}' classified as {base_tier.name}")
return RiskResult(
tier=base_tier,
confidence=round(confidence, 2),
reasons=reasons,
context_factors={
"path_risk": max_path_risk,
"modifier": round(modifier, 2),
"scope": round(scope, 2),
"recency": round(recency, 2),
"paths": paths,
},
)