Compare commits
1 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
fa81831cd2 |
42
benchmarks/test_images.json
Normal file
42
benchmarks/test_images.json
Normal file
@@ -0,0 +1,42 @@
|
||||
[
|
||||
{
|
||||
"id": "img_001",
|
||||
"name": "red_circle",
|
||||
"path": "benchmarks/test_images/red_circle.png",
|
||||
"description": "A red circle on a white background",
|
||||
"expected_answer_contains": ["red", "circle"],
|
||||
"category": "shape_color"
|
||||
},
|
||||
{
|
||||
"id": "img_002",
|
||||
"name": "blue_square",
|
||||
"path": "benchmarks/test_images/blue_square.png",
|
||||
"description": "A blue square on a white background",
|
||||
"expected_answer_contains": ["blue", "square"],
|
||||
"category": "shape_color"
|
||||
},
|
||||
{
|
||||
"id": "img_003",
|
||||
"name": "green_triangle",
|
||||
"path": "benchmarks/test_images/green_triangle.png",
|
||||
"description": "A green triangle on a white background",
|
||||
"expected_answer_contains": ["green", "triangle"],
|
||||
"category": "shape_color"
|
||||
},
|
||||
{
|
||||
"id": "img_004",
|
||||
"name": "text_hello",
|
||||
"path": "benchmarks/test_images/text_hello.png",
|
||||
"description": "An image containing the text 'Hello World'",
|
||||
"expected_answer_contains": ["hello", "world"],
|
||||
"category": "ocr"
|
||||
},
|
||||
{
|
||||
"id": "img_005",
|
||||
"name": "mixed_shapes",
|
||||
"path": "benchmarks/test_images/mixed_shapes.png",
|
||||
"description": "Multiple colored shapes: red circle, blue square, yellow star",
|
||||
"expected_answer_contains": ["red", "blue", "yellow"],
|
||||
"category": "counting"
|
||||
}
|
||||
]
|
||||
BIN
benchmarks/test_images/blue_square.png
Normal file
BIN
benchmarks/test_images/blue_square.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 779 B |
BIN
benchmarks/test_images/green_triangle.png
Normal file
BIN
benchmarks/test_images/green_triangle.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.3 KiB |
BIN
benchmarks/test_images/mixed_shapes.png
Normal file
BIN
benchmarks/test_images/mixed_shapes.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.2 KiB |
BIN
benchmarks/test_images/red_circle.png
Normal file
BIN
benchmarks/test_images/red_circle.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 1.4 KiB |
BIN
benchmarks/test_images/text_hello.png
Normal file
BIN
benchmarks/test_images/text_hello.png
Normal file
Binary file not shown.
|
After Width: | Height: | Size: 3.3 KiB |
204
benchmarks/vision_benchmark.py
Normal file
204
benchmarks/vision_benchmark.py
Normal file
@@ -0,0 +1,204 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Vision benchmark — test model image understanding with local test images.
|
||||
|
||||
Uses locally-stored test images (not external URLs) for reliable CI.
|
||||
|
||||
Usage:
|
||||
python3 benchmarks/vision_benchmark.py --model hermes3
|
||||
python3 benchmarks/vision_benchmark.py --model qwen2.5 --json
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import base64
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List
|
||||
|
||||
BENCHMARK_DIR = Path(__file__).resolve().parent
|
||||
TEST_IMAGES_FILE = BENCHMARK_DIR / "test_images.json"
|
||||
|
||||
|
||||
def load_test_dataset() -> List[Dict[str, Any]]:
|
||||
"""Load test image dataset."""
|
||||
if not TEST_IMAGES_FILE.exists():
|
||||
raise FileNotFoundError(f"Test dataset not found: {TEST_IMAGES_FILE}")
|
||||
with open(TEST_IMAGES_FILE) as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def encode_image_base64(image_path: str) -> str:
|
||||
"""Encode image as base64 for API call."""
|
||||
with open(image_path, "rb") as f:
|
||||
return base64.b64encode(f.read()).decode()
|
||||
|
||||
|
||||
def verify_images_exist(dataset: List[Dict[str, Any]]) -> List[str]:
|
||||
"""Verify all test images exist locally."""
|
||||
missing = []
|
||||
for item in dataset:
|
||||
path = BENCHMARK_DIR.parent / item["path"]
|
||||
if not path.exists():
|
||||
missing.append(item["path"])
|
||||
return missing
|
||||
|
||||
|
||||
def run_vision_test(
|
||||
image_path: str,
|
||||
prompt: str,
|
||||
base_url: str = "http://localhost:11434/v1",
|
||||
model: str = "",
|
||||
api_key: str = "",
|
||||
timeout: int = 30,
|
||||
) -> Dict[str, Any]:
|
||||
"""Run a single vision test against a model."""
|
||||
import urllib.request
|
||||
|
||||
img_b64 = encode_image_base64(image_path)
|
||||
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": [
|
||||
{"type": "text", "text": prompt},
|
||||
{
|
||||
"type": "image_url",
|
||||
"image_url": {"url": f"data:image/png;base64,{img_b64}"},
|
||||
},
|
||||
],
|
||||
}
|
||||
]
|
||||
|
||||
body = {
|
||||
"model": model or "",
|
||||
"messages": messages,
|
||||
"max_tokens": 200,
|
||||
}
|
||||
|
||||
headers = {"Content-Type": "application/json"}
|
||||
if api_key:
|
||||
headers["Authorization"] = f"Bearer {api_key}"
|
||||
|
||||
url = f"{base_url.rstrip('/')}/chat/completions"
|
||||
t0 = time.monotonic()
|
||||
|
||||
try:
|
||||
req = urllib.request.Request(url, data=json.dumps(body).encode(), headers=headers, method="POST")
|
||||
with urllib.request.urlopen(req, timeout=timeout) as resp:
|
||||
data = json.loads(resp.read())
|
||||
elapsed = time.monotonic() - t0
|
||||
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
|
||||
return {
|
||||
"success": True,
|
||||
"response": content,
|
||||
"latency_ms": int(elapsed * 1000),
|
||||
"model": data.get("model", model),
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"success": False,
|
||||
"response": "",
|
||||
"latency_ms": int((time.monotonic() - t0) * 1000),
|
||||
"error": str(e),
|
||||
}
|
||||
|
||||
|
||||
def evaluate_response(response: str, expected: List[str]) -> bool:
|
||||
"""Check if response contains expected keywords."""
|
||||
response_lower = response.lower()
|
||||
return all(kw.lower() in response_lower for kw in expected)
|
||||
|
||||
|
||||
def run_benchmark(
|
||||
base_url: str = "http://localhost:11434/v1",
|
||||
model: str = "",
|
||||
) -> Dict[str, Any]:
|
||||
"""Run full vision benchmark."""
|
||||
dataset = load_test_dataset()
|
||||
|
||||
# Verify images exist
|
||||
missing = verify_images_exist(dataset)
|
||||
if missing:
|
||||
return {"error": f"Missing test images: {missing}", "passed": 0, "total": len(dataset)}
|
||||
|
||||
results = []
|
||||
passed = 0
|
||||
|
||||
for item in dataset:
|
||||
image_path = str(BENCHMARK_DIR.parent / item["path"])
|
||||
prompt = f"What do you see in this image? Describe the shapes and colors."
|
||||
|
||||
result = run_vision_test(image_path, prompt, base_url=base_url, model=model)
|
||||
result["test_id"] = item["id"]
|
||||
result["test_name"] = item["name"]
|
||||
result["category"] = item["category"]
|
||||
|
||||
if result["success"]:
|
||||
result["correct"] = evaluate_response(result["response"], item["expected_answer_contains"])
|
||||
if result["correct"]:
|
||||
passed += 1
|
||||
else:
|
||||
result["correct"] = False
|
||||
|
||||
results.append(result)
|
||||
|
||||
return {
|
||||
"model": model,
|
||||
"base_url": base_url,
|
||||
"passed": passed,
|
||||
"total": len(dataset),
|
||||
"success_rate": passed / len(dataset) if dataset else 0,
|
||||
"results": results,
|
||||
}
|
||||
|
||||
|
||||
def format_report(benchmark: Dict[str, Any]) -> str:
|
||||
"""Format benchmark results."""
|
||||
if "error" in benchmark:
|
||||
return f"ERROR: {benchmark['error']}"
|
||||
|
||||
lines = [
|
||||
"Vision Benchmark Results",
|
||||
"=" * 40,
|
||||
f"Model: {benchmark.get('model', 'unknown')}",
|
||||
f"Passed: {benchmark['passed']}/{benchmark['total']} ({benchmark['success_rate']:.0%})",
|
||||
"",
|
||||
]
|
||||
|
||||
for r in benchmark.get("results", []):
|
||||
icon = "\u2705" if r.get("correct") else "\u274c"
|
||||
name = r.get("test_name", "?")
|
||||
cat = r.get("category", "?")
|
||||
lat = r.get("latency_ms", 0)
|
||||
lines.append(f" {icon} {name} ({cat}) — {lat}ms")
|
||||
if not r.get("success"):
|
||||
lines.append(f" Error: {r.get('error', 'unknown')}")
|
||||
elif not r.get("correct"):
|
||||
lines.append(f" Got: {r.get('response', '')[:100]}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def main():
|
||||
import argparse
|
||||
parser = argparse.ArgumentParser(description="Vision benchmark")
|
||||
parser.add_argument("--base-url", default="http://localhost:11434/v1")
|
||||
parser.add_argument("--model", default="")
|
||||
parser.add_argument("--json", action="store_true")
|
||||
args = parser.parse_args()
|
||||
|
||||
benchmark = run_benchmark(base_url=args.base_url, model=args.model)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(benchmark, indent=2))
|
||||
else:
|
||||
print(format_report(benchmark))
|
||||
|
||||
return 0 if benchmark.get("success_rate", 0) >= 0.8 else 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,313 +0,0 @@
|
||||
"""Context-Aware Risk Scoring — ML-lite tier detection enhancement.
|
||||
|
||||
Enhances the existing approval.py dangerous-command detection with
|
||||
context-aware risk scoring. Instead of pure pattern matching, considers:
|
||||
|
||||
1. Path context: rm /tmp/x is safer than rm /etc/passwd
|
||||
2. Command context: chmod 777 on project dir vs system dir
|
||||
3. Recency: repeated dangerous commands increase risk
|
||||
4. Scope: commands affecting more files = higher risk
|
||||
|
||||
Usage:
|
||||
from tools.risk_scorer import score_action, RiskResult
|
||||
result = score_action("rm -rf /tmp/build")
|
||||
# result.tier = MEDIUM (not HIGH, because /tmp is safe)
|
||||
# result.confidence = 0.7
|
||||
"""
|
||||
|
||||
import os
|
||||
import re
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import IntEnum
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
# Risk tiers (aligned with approval_tiers.py)
|
||||
class RiskTier(IntEnum):
|
||||
SAFE = 0
|
||||
LOW = 1
|
||||
MEDIUM = 2
|
||||
HIGH = 3
|
||||
CRITICAL = 4
|
||||
|
||||
|
||||
@dataclass
|
||||
class RiskResult:
|
||||
"""Result of risk scoring."""
|
||||
tier: RiskTier
|
||||
confidence: float # 0.0 to 1.0
|
||||
reasons: List[str] = field(default_factory=list)
|
||||
context_factors: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
# --- Path risk assessment ---
|
||||
|
||||
SAFE_PATHS = {
|
||||
"/tmp", "/var/tmp", "/dev/shm",
|
||||
"~/.hermes/sessions", "~/.hermes/cache", "~/.hermes/logs",
|
||||
"/tmp/", "/var/tmp/",
|
||||
}
|
||||
|
||||
HIGH_RISK_PATHS = {
|
||||
"/etc", "/boot", "/usr/lib", "/usr/bin",
|
||||
"~/.ssh", "~/.gnupg",
|
||||
"/var/lib", "/opt",
|
||||
}
|
||||
|
||||
CRITICAL_PATHS = {
|
||||
"/", "/etc/passwd", "/etc/shadow", "/etc/sudoers",
|
||||
"~/.ssh/id_rsa", "~/.ssh/authorized_keys",
|
||||
"/boot/vmlinuz", "/dev/sda", "/dev/nvme",
|
||||
}
|
||||
|
||||
|
||||
def _extract_paths(command: str) -> List[str]:
|
||||
"""Extract file paths from a command."""
|
||||
paths = []
|
||||
# Match common path patterns
|
||||
for match in re.finditer(r'[/~][\w/.~-]+', command):
|
||||
paths.append(match.group())
|
||||
# Also match $HOME, $HERMES_HOME expansions
|
||||
for match in re.finditer(r'\$(?:HOME|HERMES_HOME|PWD)[/\w]*', command):
|
||||
paths.append(match.group())
|
||||
return paths
|
||||
|
||||
|
||||
def _classify_path(path: str) -> str:
|
||||
"""Classify a path as safe, high-risk, or critical."""
|
||||
path_lower = path.lower().replace("\\", "/")
|
||||
|
||||
for critical in CRITICAL_PATHS:
|
||||
if path_lower.startswith(critical.lower()):
|
||||
return "critical"
|
||||
|
||||
for high in HIGH_RISK_PATHS:
|
||||
if path_lower.startswith(high.lower()):
|
||||
return "high"
|
||||
|
||||
for safe in SAFE_PATHS:
|
||||
if path_lower.startswith(safe.lower()):
|
||||
return "safe"
|
||||
|
||||
# Unknown paths default to medium
|
||||
return "unknown"
|
||||
|
||||
|
||||
# --- Command risk modifiers ---
|
||||
|
||||
RISK_MODIFIERS = {
|
||||
# Flags that increase risk
|
||||
"-rf": 1.5,
|
||||
"-r": 1.2,
|
||||
"--force": 1.5,
|
||||
"--recursive": 1.2,
|
||||
"--no-preserve-root": 3.0,
|
||||
"-f": 1.3,
|
||||
"--hard": 1.5,
|
||||
"--force-push": 2.0,
|
||||
"-D": 1.4,
|
||||
# Flags that decrease risk
|
||||
"--dry-run": 0.1,
|
||||
"-n": 0.3,
|
||||
"--no-act": 0.1,
|
||||
"--interactive": 0.7,
|
||||
"-i": 0.7,
|
||||
}
|
||||
|
||||
|
||||
def _get_command_risk_modifier(command: str) -> float:
|
||||
"""Get risk modifier based on command flags."""
|
||||
modifier = 1.0
|
||||
for flag, mod in RISK_MODIFIERS.items():
|
||||
if flag in command:
|
||||
modifier *= mod
|
||||
return modifier
|
||||
|
||||
|
||||
# --- Scope assessment ---
|
||||
|
||||
def _assess_scope(command: str) -> float:
|
||||
"""Assess the scope of a command (how many files/systems affected)."""
|
||||
scope = 1.0
|
||||
|
||||
# Wildcards increase scope
|
||||
if "*" in command or "?" in command:
|
||||
scope *= 2.0
|
||||
|
||||
# Recursive operations increase scope
|
||||
if re.search(r'-r[f]?\b', command):
|
||||
scope *= 1.5
|
||||
|
||||
# find/xargs pipelines increase scope
|
||||
if "find" in command and ("exec" in command or "xargs" in command):
|
||||
scope *= 2.0
|
||||
|
||||
# Multiple targets increase scope
|
||||
paths = _extract_paths(command)
|
||||
if len(paths) > 2:
|
||||
scope *= 1.3
|
||||
|
||||
return min(scope, 5.0) # Cap at 5x
|
||||
|
||||
|
||||
# --- Recent command tracking ---
|
||||
|
||||
_recent_commands: List[Tuple[float, str]] = []
|
||||
_TRACK_WINDOW = 300 # 5 minutes
|
||||
|
||||
|
||||
def _track_command(command: str) -> float:
|
||||
"""Track command and return escalation factor based on recency."""
|
||||
now = time.time()
|
||||
|
||||
# Clean old entries
|
||||
global _recent_commands
|
||||
_recent_commands = [
|
||||
(ts, cmd) for ts, cmd in _recent_commands
|
||||
if now - ts < _TRACK_WINDOW
|
||||
]
|
||||
|
||||
# Check for repeated dangerous patterns
|
||||
escalation = 1.0
|
||||
for ts, recent_cmd in _recent_commands:
|
||||
# Same command repeated = escalating risk
|
||||
if recent_cmd == command:
|
||||
escalation += 0.2
|
||||
# Similar commands = moderate escalation
|
||||
elif _commands_similar(command, recent_cmd):
|
||||
escalation += 0.1
|
||||
|
||||
_recent_commands.append((now, command))
|
||||
return min(escalation, 3.0) # Cap at 3x
|
||||
|
||||
|
||||
def _commands_similar(cmd1: str, cmd2: str) -> bool:
|
||||
"""Check if two commands are structurally similar."""
|
||||
# Extract command name
|
||||
name1 = cmd1.split()[0] if cmd1.split() else ""
|
||||
name2 = cmd2.split()[0] if cmd2.split() else ""
|
||||
return name1 == name2
|
||||
|
||||
|
||||
# --- Main scoring function ---
|
||||
|
||||
# Base tier mapping from command name
|
||||
COMMAND_BASE_TIERS = {
|
||||
"rm": RiskTier.HIGH,
|
||||
"chmod": RiskTier.MEDIUM,
|
||||
"chown": RiskTier.HIGH,
|
||||
"mkfs": RiskTier.CRITICAL,
|
||||
"dd": RiskTier.HIGH,
|
||||
"kill": RiskTier.HIGH,
|
||||
"pkill": RiskTier.HIGH,
|
||||
"systemctl": RiskTier.HIGH,
|
||||
"git": RiskTier.LOW,
|
||||
"sed": RiskTier.LOW,
|
||||
"cp": RiskTier.LOW,
|
||||
"mv": RiskTier.LOW,
|
||||
"python3": RiskTier.LOW,
|
||||
"pip": RiskTier.LOW,
|
||||
"npm": RiskTier.LOW,
|
||||
"docker": RiskTier.MEDIUM,
|
||||
"ansible": RiskTier.HIGH,
|
||||
}
|
||||
|
||||
|
||||
def score_action(action: str, context: Optional[Dict[str, Any]] = None) -> RiskResult:
|
||||
"""Score an action's risk level with context awareness.
|
||||
|
||||
Considers:
|
||||
- Command base risk
|
||||
- Path context (safe vs critical paths)
|
||||
- Command flags (force, recursive, dry-run)
|
||||
- Scope (wildcards, multiple targets)
|
||||
- Recency (repeated commands escalate)
|
||||
|
||||
Returns:
|
||||
RiskResult with tier, confidence, and reasons.
|
||||
"""
|
||||
if not action or not isinstance(action, str):
|
||||
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty input"])
|
||||
|
||||
parts = action.strip().split()
|
||||
if not parts:
|
||||
return RiskResult(tier=RiskTier.SAFE, confidence=1.0, reasons=["empty command"])
|
||||
|
||||
cmd_name = parts[0].split("/")[-1] # Extract command name
|
||||
|
||||
# Base tier from command name
|
||||
base_tier = COMMAND_BASE_TIERS.get(cmd_name, RiskTier.SAFE)
|
||||
|
||||
# Path risk assessment
|
||||
paths = _extract_paths(action)
|
||||
max_path_risk = "safe"
|
||||
for path in paths:
|
||||
path_risk = _classify_path(path)
|
||||
risk_order = {"safe": 0, "unknown": 1, "high": 2, "critical": 3}
|
||||
if risk_order.get(path_risk, 0) > risk_order.get(max_path_risk, 0):
|
||||
max_path_risk = path_risk
|
||||
|
||||
# Calculate final tier
|
||||
reasons = []
|
||||
|
||||
# Path-based tier adjustment
|
||||
if max_path_risk == "critical":
|
||||
base_tier = RiskTier.CRITICAL
|
||||
reasons.append(f"Critical path detected: {paths[0] if paths else 'unknown'}")
|
||||
elif max_path_risk == "high":
|
||||
if base_tier.value < RiskTier.HIGH.value:
|
||||
base_tier = RiskTier.HIGH
|
||||
reasons.append(f"High-risk path: {paths[0] if paths else 'unknown'}")
|
||||
elif max_path_risk == "safe":
|
||||
# Downgrade if all paths are safe
|
||||
if base_tier.value > RiskTier.MEDIUM.value:
|
||||
base_tier = RiskTier.MEDIUM
|
||||
reasons.append("Safe path context — risk downgraded")
|
||||
|
||||
# Apply modifiers
|
||||
modifier = _get_command_risk_modifier(action)
|
||||
scope = _assess_scope(action)
|
||||
recency = _track_command(action)
|
||||
|
||||
# Check for dry-run (overrides everything)
|
||||
if "--dry-run" in action or "-n " in action:
|
||||
return RiskResult(
|
||||
tier=RiskTier.SAFE,
|
||||
confidence=0.95,
|
||||
reasons=["dry-run mode — no actual changes"],
|
||||
context_factors={"dry_run": True},
|
||||
)
|
||||
|
||||
# Calculate confidence
|
||||
confidence = 0.8 # Base confidence
|
||||
|
||||
if max_path_risk == "safe":
|
||||
confidence = 0.9
|
||||
elif max_path_risk == "unknown":
|
||||
confidence = 0.6
|
||||
elif max_path_risk == "critical":
|
||||
confidence = 0.95
|
||||
|
||||
# Reasons
|
||||
if modifier > 1.5:
|
||||
reasons.append(f"Force/recursive flags (modifier: {modifier:.1f}x)")
|
||||
if scope > 1.5:
|
||||
reasons.append(f"Wide scope (wildcards/multiple targets, {scope:.1f}x)")
|
||||
if recency > 1.2:
|
||||
reasons.append(f"Repeated command pattern ({recency:.1f}x escalation)")
|
||||
|
||||
if not reasons:
|
||||
reasons.append(f"Command '{cmd_name}' classified as {base_tier.name}")
|
||||
|
||||
return RiskResult(
|
||||
tier=base_tier,
|
||||
confidence=round(confidence, 2),
|
||||
reasons=reasons,
|
||||
context_factors={
|
||||
"path_risk": max_path_risk,
|
||||
"modifier": round(modifier, 2),
|
||||
"scope": round(scope, 2),
|
||||
"recency": round(recency, 2),
|
||||
"paths": paths,
|
||||
},
|
||||
)
|
||||
Reference in New Issue
Block a user