Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
f8f4678ee4 feat: benchmark local Ollama models against 50 tok/s threshold (#287)
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 1m24s
Add scripts/benchmark_local_models.py — tests all local Ollama models
against the 50 tok/s UX threshold (configurable via --threshold).

Features:
- Auto-discovers all pulled Ollama models or test specific ones
- Configurable rounds, max tokens, threshold
- Per-round timing with prompt_eval/eval token breakdown
- Human-readable table report with PASS/FAIL/ERROR status
- JSON output mode (--json) for CI integration
- Exit code 1 if any model fails threshold

Usage:
  python3 scripts/benchmark_local_models.py                 # all models, 3 rounds
  python3 scripts/benchmark_local_models.py --models qwen2.5:7b  # single model
  python3 scripts/benchmark_local_models.py --json          # CI output
  python3 scripts/benchmark_local_models.py --threshold 30  # custom threshold

Tested: gemma3:1b scores 141.8 tok/s (PASS).

Closes #287
2026-04-13 17:46:53 -04:00
4 changed files with 286 additions and 248 deletions

View File

@@ -1,11 +1,10 @@
"""Helpers for optional cheap-vs-strong and time-aware model routing."""
"""Helpers for optional cheap-vs-strong model routing."""
from __future__ import annotations
import os
import re
from datetime import datetime
from typing import Any, Dict, List, Optional
from typing import Any, Dict, Optional
from utils import is_truthy_value
@@ -193,104 +192,3 @@ def resolve_turn_route(user_message: str, routing_config: Optional[Dict[str, Any
tuple(runtime.get("args") or ()),
),
}
# =========================================================================
# Time-aware cron model routing
# =========================================================================
#
# Empirical finding: cron error rate peaks at 18:00 (9.4%) vs 4.0% at 09:00.
# During high-error windows, route cron jobs to more capable models.
#
# Config (config.yaml):
# cron_model_routing:
# enabled: true
# fallback_model: "anthropic/claude-sonnet-4"
# fallback_provider: "openrouter"
# windows:
# - start_hour: 17
# end_hour: 22
# reason: "evening_error_peak"
# - start_hour: 2
# end_hour: 5
# reason: "overnight_api_instability"
# =========================================================================
def _hour_in_window(hour: int, start: int, end: int) -> bool:
"""Check if hour falls in [start, end) window, handling midnight wrap."""
if start <= end:
return start <= hour < end
else:
# Wraps midnight: e.g., 22-06
return hour >= start or hour < end
def resolve_cron_model(
base_model: str,
routing_config: Optional[Dict[str, Any]],
now: Optional[datetime] = None,
) -> Dict[str, Any]:
"""Apply time-aware model override for cron jobs.
During configured high-error windows, returns a stronger model config.
Outside windows, returns the base model unchanged.
Args:
base_model: The model string already resolved (from job/config/env).
routing_config: The cron_model_routing dict from config.yaml.
now: Override current time (for testing). Defaults to datetime.now().
Returns:
Dict with keys: model, provider, overridden, reason.
- model: the effective model string to use
- provider: provider override (empty string = use default)
- overridden: True if time-based override was applied
- reason: why override was applied (empty string if not)
"""
cfg = routing_config or {}
if not _coerce_bool(cfg.get("enabled"), False):
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
windows = cfg.get("windows") or []
if not isinstance(windows, list) or not windows:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
current = now or datetime.now()
current_hour = current.hour
matched_window = None
for window in windows:
if not isinstance(window, dict):
continue
start = _coerce_int(window.get("start_hour"), -1)
end = _coerce_int(window.get("end_hour"), -1)
if start < 0 or end < 0:
continue
if _hour_in_window(current_hour, start, end):
matched_window = window
break
if not matched_window:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
# Window matched — use the override model from window or global fallback
override_model = str(matched_window.get("model") or "").strip()
override_provider = str(matched_window.get("provider") or "").strip()
if not override_model:
override_model = str(cfg.get("fallback_model") or "").strip()
if not override_provider:
override_provider = str(cfg.get("fallback_provider") or "").strip()
if not override_model:
return {"model": base_model, "provider": "", "overridden": False, "reason": ""}
reason = str(matched_window.get("reason") or "time_window").strip()
return {
"model": override_model,
"provider": override_provider,
"overridden": True,
"reason": f"cron_routing:{reason}(hour={current_hour})",
}

View File

@@ -717,22 +717,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
# Reasoning config from env or config.yaml
from hermes_constants import parse_reasoning_effort
# Time-aware cron model routing — override model during high-error windows
try:
from agent.smart_model_routing import resolve_cron_model
_cron_routing_cfg = (_cfg.get("cron_model_routing") or {})
_cron_route = resolve_cron_model(model, _cron_routing_cfg)
if _cron_route["overridden"]:
_original_model = model
model = _cron_route["model"]
logger.info(
"Job '%s': cron model override %s -> %s (%s)",
job_id, _original_model, model, _cron_route["reason"],
)
except Exception as _e:
logger.debug("Job '%s': cron model routing skipped: %s", job_id, _e)
effort = os.getenv("HERMES_REASONING_EFFORT", "")
if not effort:
effort = str(_cfg.get("agent", {}).get("reasoning_effort", "")).strip()

View File

@@ -0,0 +1,284 @@
#!/usr/bin/env python3
"""
Benchmark local Ollama models against the 50 tok/s UX threshold.
Usage:
python3 scripts/benchmark_local_models.py [--models MODEL1,MODEL2] [--prompt PROMPT] [--rounds N]
python3 scripts/benchmark_local_models.py --all # test all pulled models
python3 scripts/benchmark_local_models.py --json # JSON output for CI
"""
import argparse
import json
import os
import sys
import time
import urllib.request
import urllib.error
from dataclasses import dataclass, asdict
from typing import Optional
OLLAMA_BASE = os.environ.get("OLLAMA_BASE_URL", "http://localhost:11434")
THRESHOLD_TOK_S = 50.0
BENCHMARK_PROMPT = (
"Explain the difference between TCP and UDP protocols. "
"Cover reliability, ordering, speed, and use cases. "
"Be thorough but concise. Write at least 300 words."
)
@dataclass
class BenchmarkResult:
model: str
size_gb: float
prompt_tokens: int
eval_tokens: int
eval_duration_s: float
tokens_per_second: float
total_duration_s: float
rounds: int
avg_tok_s: float
meets_threshold: bool
error: Optional[str] = None
def get_models() -> list[dict]:
"""List all pulled Ollama models."""
url = f"{OLLAMA_BASE}/api/tags"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=10) as resp:
data = json.loads(resp.read())
return data.get("models", [])
except Exception as e:
print(f"Error connecting to Ollama at {OLLAMA_BASE}: {e}", file=sys.stderr)
sys.exit(1)
def benchmark_model(model: str, prompt: str, num_predict: int = 512) -> dict:
"""Run a single benchmark generation, return timing stats."""
url = f"{OLLAMA_BASE}/api/generate"
payload = json.dumps({
"model": model,
"prompt": prompt,
"stream": False,
"options": {
"num_predict": num_predict,
"temperature": 0.1, # low temp for consistent output
},
}).encode()
req = urllib.request.Request(url, data=payload, method="POST")
req.add_header("Content-Type", "application/json")
start = time.monotonic()
try:
with urllib.request.urlopen(req, timeout=300) as resp:
data = json.loads(resp.read())
except urllib.error.HTTPError as e:
body = e.read().decode() if e.fp else str(e)
raise RuntimeError(f"HTTP {e.code}: {body[:200]}")
except Exception as e:
raise RuntimeError(str(e))
elapsed = time.monotonic() - start
prompt_tokens = data.get("prompt_eval_count", 0)
eval_tokens = data.get("eval_count", 0)
eval_duration_ns = data.get("eval_duration", 0)
total_duration_ns = data.get("total_duration", 0)
eval_duration_s = eval_duration_ns / 1e9 if eval_duration_ns else elapsed
total_duration_s = total_duration_ns / 1e9 if total_duration_ns else elapsed
tok_s = eval_tokens / eval_duration_s if eval_duration_s > 0 else 0.0
return {
"prompt_tokens": prompt_tokens,
"eval_tokens": eval_tokens,
"eval_duration_s": round(eval_duration_s, 2),
"total_duration_s": round(total_duration_s, 2),
"tokens_per_second": round(tok_s, 1),
}
def run_benchmark(
model_name: str,
model_size: float,
prompt: str,
rounds: int,
num_predict: int,
threshold: float = 50.0,
) -> BenchmarkResult:
"""Run multiple rounds and compute average."""
results = []
errors = []
for i in range(rounds):
try:
r = benchmark_model(model_name, prompt, num_predict)
results.append(r)
print(f" Round {i+1}/{rounds}: {r['tokens_per_second']} tok/s "
f"({r['eval_tokens']} tokens in {r['eval_duration_s']}s)")
except Exception as e:
errors.append(str(e))
print(f" Round {i+1}/{rounds}: ERROR - {e}")
if not results:
return BenchmarkResult(
model=model_name,
size_gb=model_size,
prompt_tokens=0, eval_tokens=0,
eval_duration_s=0, tokens_per_second=0,
total_duration_s=0, rounds=rounds,
avg_tok_s=0, meets_threshold=False,
error="; ".join(errors),
)
avg_tok_s = sum(r["tokens_per_second"] for r in results) / len(results)
avg_tok_s = round(avg_tok_s, 1)
return BenchmarkResult(
model=model_name,
size_gb=model_size,
prompt_tokens=sum(r["prompt_tokens"] for r in results) // len(results),
eval_tokens=sum(r["eval_tokens"] for r in results) // len(results),
eval_duration_s=round(sum(r["eval_duration_s"] for r in results) / len(results), 2),
tokens_per_second=avg_tok_s,
total_duration_s=round(sum(r["total_duration_s"] for r in results) / len(results), 2),
rounds=len(results),
avg_tok_s=avg_tok_s,
meets_threshold=avg_tok_s >= threshold,
)
def format_report(results: list[BenchmarkResult], threshold: float = 50.0) -> str:
"""Format a human-readable benchmark report."""
lines = []
lines.append("")
lines.append("=" * 72)
lines.append(f" LOCAL MODEL BENCHMARK — {threshold:.0f} tok/s UX Threshold")
lines.append("=" * 72)
lines.append("")
# Summary table
header = f"{'Model':<25} {'Size':>6} {'tok/s':>8} {'Threshold':>10} {'Status':>8}"
lines.append(header)
lines.append("-" * 72)
passed = 0
failed = 0
errors = 0
for r in sorted(results, key=lambda x: x.avg_tok_s, reverse=True):
size_str = f"{r.size_gb:.1f}GB"
tok_s_str = f"{r.avg_tok_s:.1f}"
if r.error:
status = "ERROR"
errors += 1
elif r.meets_threshold:
status = "PASS"
passed += 1
else:
status = "FAIL"
failed += 1
marker = ">" if r.meets_threshold else "X" if r.error else "!"
thresh_str = f">= {threshold:.0f}"
lines.append(f" {marker} {r.model:<23} {size_str:>6} {tok_s_str:>8} {thresh_str:>10} {status:>8}")
lines.append("-" * 72)
lines.append(f" Passed: {passed} | Failed: {failed} | Errors: {errors} | Total: {len(results)}")
lines.append("")
# Detail section for failures
failures = [r for r in results if not r.meets_threshold and not r.error]
if failures:
lines.append(" FAILED MODELS (below threshold):")
for r in sorted(failures, key=lambda x: x.avg_tok_s):
gap = threshold - r.avg_tok_s
lines.append(f" - {r.model}: {r.avg_tok_s:.1f} tok/s "
f"({gap:.1f} tok/s short, {r.eval_tokens} avg tokens/round)")
lines.append("")
error_list = [r for r in results if r.error]
if error_list:
lines.append(" ERRORS:")
for r in error_list:
lines.append(f" - {r.model}: {r.error}")
lines.append("")
# Hardware info
import platform
lines.append(f" Host: {platform.node()} | {platform.system()} {platform.release()}")
lines.append(f" Ollama: {OLLAMA_BASE}")
lines.append("")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Benchmark local Ollama models vs 50 tok/s threshold")
parser.add_argument("--models", help="Comma-separated model names (default: all)")
parser.add_argument("--prompt", default=BENCHMARK_PROMPT, help="Benchmark prompt")
parser.add_argument("--rounds", type=int, default=3, help="Rounds per model (default: 3)")
parser.add_argument("--tokens", type=int, default=512, help="Max tokens to generate (default: 512)")
parser.add_argument("--json", action="store_true", help="JSON output for CI")
parser.add_argument("--all", action="store_true", help="Test all pulled models")
parser.add_argument("--threshold", type=float, default=THRESHOLD_TOK_S, help="tok/s threshold")
args = parser.parse_args()
threshold = args.threshold
# Get model list
available = get_models()
if not available:
print("No models found. Pull a model first: ollama pull <model>", file=sys.stderr)
sys.exit(1)
if args.models:
names = [m.strip() for m in args.models.split(",")]
models = [m for m in available if m["name"] in names]
missing = set(names) - set(m["name"] for m in models)
if missing:
print(f"Models not found: {', '.join(missing)}", file=sys.stderr)
print(f"Available: {', '.join(m['name'] for m in available)}", file=sys.stderr)
else:
models = available
print(f"Benchmarking {len(models)} model(s) against {threshold} tok/s threshold")
print(f"Ollama: {OLLAMA_BASE} | Rounds: {args.rounds} | Max tokens: {args.tokens}")
print()
results = []
for m in models:
name = m["name"]
size_gb = m.get("size", 0) / (1024**3)
print(f" {name} ({size_gb:.1f}GB):")
result = run_benchmark(name, size_gb, args.prompt, args.rounds, args.tokens, threshold)
results.append(result)
# Output
report = format_report(results, threshold)
if args.json:
output = {
"threshold_tok_s": threshold,
"ollama_base": OLLAMA_BASE,
"rounds": args.rounds,
"results": [asdict(r) for r in results],
"passed": sum(1 for r in results if r.meets_threshold),
"failed": sum(1 for r in results if not r.meets_threshold and not r.error),
"errors": sum(1 for r in results if r.error),
}
print(json.dumps(output, indent=2))
else:
print(report)
# Exit code: 0 if all pass, 1 if any fail/error
if any(not r.meets_threshold or r.error for r in results):
sys.exit(1)
sys.exit(0)
if __name__ == "__main__":
main()

View File

@@ -1,128 +0,0 @@
"""Tests for time-aware cron model routing — Issue #317."""
import pytest
from datetime import datetime
from agent.smart_model_routing import resolve_cron_model, _hour_in_window
class TestHourInWindow:
"""Hour-in-window detection including midnight wrap."""
def test_normal_window(self):
assert _hour_in_window(18, 17, 22) is True
assert _hour_in_window(16, 17, 22) is False
assert _hour_in_window(22, 17, 22) is False
def test_midnight_wrap(self):
assert _hour_in_window(23, 22, 6) is True
assert _hour_in_window(3, 22, 6) is True
assert _hour_in_window(10, 22, 6) is False
def test_edge_cases(self):
assert _hour_in_window(0, 0, 24) is True
assert _hour_in_window(23, 0, 24) is True
assert _hour_in_window(0, 22, 6) is True
assert _hour_in_window(5, 22, 6) is True
assert _hour_in_window(6, 22, 6) is False
class TestResolveCronModel:
"""Time-aware model resolution for cron jobs."""
def _config(self, **overrides):
base = {
"enabled": True,
"fallback_model": "anthropic/claude-sonnet-4",
"fallback_provider": "openrouter",
"windows": [
{"start_hour": 17, "end_hour": 22, "reason": "evening_error_peak"},
],
}
base.update(overrides)
return base
def test_disabled_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": False}, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_config_returns_base(self):
result = resolve_cron_model("mimo", None)
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_no_windows_returns_base(self):
result = resolve_cron_model("mimo", {"enabled": True, "windows": []}, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
def test_evening_window_overrides(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-sonnet-4"
assert result["provider"] == "openrouter"
assert result["overridden"] is True
assert "evening_error_peak" in result["reason"]
assert "hour=18" in result["reason"]
def test_outside_window_keeps_base(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 9, 0))
assert result["model"] == "mimo"
assert result["overridden"] is False
def test_window_boundary_start_inclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 17, 0))
assert result["overridden"] is True
def test_window_boundary_end_exclusive(self):
result = resolve_cron_model("mimo", self._config(), now=datetime(2026, 4, 12, 22, 0))
assert result["overridden"] is False
def test_midnight_window(self):
config = self._config(windows=[{"start_hour": 22, "end_hour": 6, "reason": "overnight"}])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 23, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False
def test_per_window_model_override(self):
config = self._config(windows=[{
"start_hour": 17, "end_hour": 22,
"model": "anthropic/claude-opus-4-6", "provider": "anthropic", "reason": "peak",
}])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["model"] == "anthropic/claude-opus-4-6"
assert result["provider"] == "anthropic"
def test_first_matching_window_wins(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 20, "model": "strong-1", "provider": "p1", "reason": "w1"},
{"start_hour": 19, "end_hour": 22, "model": "strong-2", "provider": "p2", "reason": "w2"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 19, 0))
assert result["model"] == "strong-1"
def test_no_fallback_model_keeps_base(self):
config = {"enabled": True, "windows": [{"start_hour": 17, "end_hour": 22, "reason": "test"}]}
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is False
assert result["model"] == "mimo"
def test_malformed_windows_skipped(self):
config = self._config(windows=[
"not-a-dict",
{"start_hour": 17},
{"end_hour": 22},
{"start_hour": "bad", "end_hour": "bad"},
{"start_hour": 17, "end_hour": 22, "reason": "valid"},
])
result = resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 18, 0))
assert result["overridden"] is True
assert "valid" in result["reason"]
def test_multiple_windows_coverage(self):
config = self._config(windows=[
{"start_hour": 17, "end_hour": 22, "reason": "evening"},
{"start_hour": 2, "end_hour": 5, "reason": "overnight"},
])
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 20, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 13, 3, 0))["overridden"] is True
assert resolve_cron_model("mimo", config, now=datetime(2026, 4, 12, 10, 0))["overridden"] is False