Compare commits

..

6 Commits

Author SHA1 Message Date
Alexander Whitestone
5bfb9eb000 feat: multi-config benchmark comparison suite (Issue #29)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 14s
benchmarks/compare_configs.py:
  - Runs 4 configs (ollama, llama-f16, llama-turbo4, llama-turbo4-adaptive)
  - Aggregates TTFT, tok/s, latency, peak memory
  - Picks winner by highest tok/s
  - Outputs JSON report + human-readable table
  - --demo mode for testing without live servers

tests/test_compare_configs.py (13 tests):
  - ConfigEntry, ConfigResult, default configs
  - Aggregation logic, winner selection, table format
  - Demo mode with and without output file
  - Prompt loading from test_prompts.json

Closes #29.
2026-04-13 21:42:29 -04:00
7a7ce0e652 burn: add long-session quality test (Issue #12) (#39)
All checks were successful
Smoke Test / smoke (push) Successful in 11s
Squash merge: add long-session quality test (closes #12)
2026-04-13 19:59:22 +00:00
9224a0162b Merge pull request 'fix: repair smoke test — exclude llama-cpp-fork build artifacts' (#38) from ci/fix-smoke-test into main
All checks were successful
Smoke Test / smoke (push) Successful in 6s
2026-04-13 19:53:38 +00:00
Alexander Whitestone
f4ceac76ce fix: repair smoke test — exclude llama-cpp-fork build artifacts
All checks were successful
Smoke Test / smoke (pull_request) Successful in 5s
1. YAML parse: CMakeConfigureLog.yaml has multiple documents
2. JSON parse: tsconfig.json and pyrightconfig.json use JSON5
   comments (not valid for Python's json.tool)
3. Also fixed: json.tool can't handle multiple files via xargs;
   switched to while-read loop
Excluded llama-cpp-fork/ from all parse checks and secret scan.
2026-04-13 10:22:13 -04:00
ab4020cca0 feat: multi-backend benchmark suite with TTFT + memory tracking (#37)
Some checks failed
Smoke Test / smoke (push) Failing after 4s
Auto-merged by Timmy overnight cycle
2026-04-13 14:05:17 +00:00
383e1fab2e fix: consolidate project reports and cleanup muda
Some checks failed
Smoke Test / smoke (push) Failing after 4s
Merge PR #36: fix: consolidate project reports and cleanup muda
2026-04-13 03:00:10 +00:00
5 changed files with 1208 additions and 65 deletions

View File

@@ -13,12 +13,12 @@ jobs:
python-version: '3.11'
- name: Parse check
run: |
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
find . -name '*.json' | xargs -r python3 -m json.tool > /dev/null
find . -name '*.py' | xargs -r python3 -m py_compile
find . -name '*.yml' -o -name '*.yaml' | grep -v .gitea | grep -v llama-cpp-fork | xargs -r python3 -c "import sys,yaml; [yaml.safe_load(open(f)) for f in sys.argv[1:]]"
find . -name '*.json' | grep -v llama-cpp-fork | while read f; do python3 -m json.tool "$f" > /dev/null || exit 1; done
find . -name '*.py' | grep -v llama-cpp-fork | xargs -r python3 -m py_compile
find . -name '*.sh' | xargs -r bash -n
echo "PASS: All files parse"
- name: Secret scan
run: |
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea; then exit 1; fi
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea | grep -v llama-cpp-fork; then exit 1; fi
echo "PASS: No secrets"

View File

@@ -0,0 +1,332 @@
#!/usr/bin/env python3
"""
TurboQuant Benchmark Comparison (Issue #29).
Runs multiple inference configurations and produces a side-by-side
comparison table with TTFT, tokens/sec, and peak memory.
Configurations (default):
1. Ollama gemma4 (baseline)
2. llama-server gemma4 f16 KV
3. llama-server gemma4 turbo4 KV
4. llama-server gemma4 turbo4 + layer-adaptive
Usage:
python3 benchmarks/compare_configs.py --help
python3 benchmarks/compare_configs.py --config benchmarks/configs.json
python3 benchmarks/compare_configs.py --demo
"""
import argparse
import json
import os
import sys
import time
from dataclasses import dataclass, field, asdict
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
# Ensure we can import sibling run_benchmarks
sys.path.insert(0, str(Path(__file__).resolve().parent))
try:
from run_benchmarks import (
run_ollama,
run_llama_server,
get_peak_memory_mb,
)
except ImportError:
# Fallback stubs when run_benchmarks (and requests) are unavailable
def run_ollama(prompt, model, url, timeout=120): # type: ignore
return {"status": "skipped", "error": "run_benchmarks not available", "latency_s": 0}
def run_llama_server(prompt, model, url, kv_type="f16", timeout=120): # type: ignore
return {"status": "skipped", "error": "run_benchmarks not available", "latency_s": 0}
def get_peak_memory_mb(): # type: ignore
return 0.0
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
@dataclass
class ConfigEntry:
"""One inference configuration to benchmark."""
name: str
backend: str # "ollama" | "llama-server"
model: str
url: str
kv_type: str = "f16"
layer_adaptive: bool = False
env: dict = field(default_factory=dict)
def to_dict(self) -> dict:
return asdict(self)
@dataclass
class ConfigResult:
"""Aggregated results for a single configuration."""
config_name: str
backend: str
model: str
kv_type: str
total_prompts: int
success: int
failed: int
avg_ttft_s: Optional[float]
avg_tok_per_sec: float
avg_latency_s: float
peak_memory_mb: float
winner: bool = False
def to_dict(self) -> dict:
return asdict(self)
# ---------------------------------------------------------------------------
# Default configurations
# ---------------------------------------------------------------------------
DEFAULT_CONFIGS: list[ConfigEntry] = [
ConfigEntry(name="ollama-gemma4", backend="ollama", model="gemma4",
url="http://localhost:11434", kv_type="default"),
ConfigEntry(name="llama-f16", backend="llama-server", model="gemma4",
url="http://localhost:8081", kv_type="f16"),
ConfigEntry(name="llama-turbo4", backend="llama-server", model="gemma4",
url="http://localhost:8081", kv_type="turbo4"),
ConfigEntry(name="llama-turbo4-adaptive", backend="llama-server",
model="gemma4", url="http://localhost:8081",
kv_type="turbo4", layer_adaptive=True),
]
# ---------------------------------------------------------------------------
# Core logic
# ---------------------------------------------------------------------------
def load_prompts(prompts_file: str) -> list[dict]:
"""Load test prompts from JSON file."""
with open(prompts_file) as f:
return json.load(f)
def run_config(config: ConfigEntry, prompts: list[dict], timeout: int = 120) -> list[dict]:
"""Run all prompts against a single configuration, return per-prompt results."""
results = []
env_overrides = {**os.environ, **config.env}
if config.layer_adaptive:
env_overrides.setdefault("TURBO_LAYER_ADAPTIVE", "7")
for item in prompts:
if config.backend == "ollama":
result = run_ollama(item["prompt"], config.model, config.url, timeout)
else:
result = run_llama_server(item["prompt"], config.model, config.url,
kv_type=config.kv_type, timeout=timeout)
result["id"] = item.get("id", item.get("category", "unknown"))
result["prompt_preview"] = item["prompt"][:120]
results.append(result)
return results
def aggregate(results: list[dict], config: ConfigEntry, peak_mb: float) -> ConfigResult:
"""Aggregate per-prompt results into a ConfigResult."""
successes = [r for r in results if r.get("status") == "success"]
ttfts = [r["ttft_s"] for r in successes if r.get("ttft_s") is not None]
tps = [r["tokens_per_sec"] for r in successes if r.get("tokens_per_sec")]
lats = [r["latency_s"] for r in successes]
return ConfigResult(
config_name=config.name,
backend=config.backend,
model=config.model,
kv_type=config.kv_type,
total_prompts=len(results),
success=len(successes),
failed=len(results) - len(successes),
avg_ttft_s=round(sum(ttfts) / len(ttfts), 3) if ttfts else None,
avg_tok_per_sec=round(sum(tps) / len(tps), 2) if tps else 0.0,
avg_latency_s=round(sum(lats) / len(lats), 3) if lats else 0.0,
peak_memory_mb=peak_mb,
)
def build_comparison_table(aggregated: list[ConfigResult]) -> str:
"""Build a human-readable comparison table."""
lines = []
header = f"{'Config':<28} {'TTFT':<8} {'tok/s':<10} {'lat(s)':<8} {'mem(MB)':<9} {'ok/n':<6}"
lines.append(header)
lines.append("-" * len(header))
for r in aggregated:
marker = " <- WINNER" if r.winner else ""
ttft = f"{r.avg_ttft_s:.3f}" if r.avg_ttft_s is not None else "N/A"
lines.append(
f"{r.config_name:<28} {ttft:<8} {r.avg_tok_per_sec:<10.2f} "
f"{r.avg_latency_s:<8.3f} {r.peak_memory_mb:<9.1f} "
f"{r.success}/{r.total_prompts}{marker}"
)
return "\n".join(lines)
def pick_winner(aggregated: list[ConfigResult]) -> ConfigResult:
"""Choose the winner: highest tokens/sec among successful configs."""
candidates = [r for r in aggregated if r.success > 0]
if not candidates:
return aggregated[0] if aggregated else ConfigResult(
config_name="none", backend="", model="", kv_type="",
total_prompts=0, success=0, failed=0,
avg_ttft_s=None, avg_tok_per_sec=0.0, avg_latency_s=0.0,
peak_memory_mb=0.0,
)
winner = max(candidates, key=lambda r: r.avg_tok_per_sec)
winner.winner = True
return winner
def run_comparison(configs: list[ConfigEntry], prompts: list[dict],
output_file: Optional[str] = None,
timeout: int = 120) -> dict:
"""Run full comparison and return structured report."""
all_results: list[ConfigResult] = []
for cfg in configs:
print(f"\n--- {cfg.name} ({cfg.backend}/{cfg.kv_type}) ---")
per_prompt = run_config(cfg, prompts, timeout)
peak_mb = get_peak_memory_mb()
agg = aggregate(per_prompt, cfg, peak_mb)
all_results.append(agg)
winner = pick_winner(all_results)
table = build_comparison_table(all_results)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"prompts_count": len(prompts),
"winner": winner.config_name,
"winner_tok_per_sec": winner.avg_tok_per_sec,
"configs": [r.to_dict() for r in all_results],
"table": table,
}
print(f"\n{table}")
print(f"\nWinner: {winner.config_name} ({winner.avg_tok_per_sec:.2f} tok/s)")
if output_file:
os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True)
with open(output_file, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to {output_file}")
return report
# ---------------------------------------------------------------------------
# Demo mode (no live servers required)
# ---------------------------------------------------------------------------
def run_demo(output_file: Optional[str] = None) -> dict:
"""Generate synthetic benchmark results for testing."""
import random
random.seed(42)
# Simulated performance baselines
baselines = {
"ollama-gemma4": {"ttft": 0.85, "tps": 18.2, "mem": 2200},
"llama-f16": {"ttft": 0.72, "tps": 22.1, "mem": 2400},
"llama-turbo4": {"ttft": 0.68, "tps": 19.8, "mem": 850},
"llama-turbo4-adaptive": {"ttft": 0.65, "tps": 20.5, "mem": 820},
}
all_results: list[ConfigResult] = []
for cfg in DEFAULT_CONFIGS:
bl = baselines[cfg.name]
prompt_count = 10
ttft = bl["ttft"] + random.gauss(0, 0.02)
tps = bl["tps"] + random.gauss(0, 0.5)
lat = (ttft + 512 / tps) + random.gauss(0, 0.1)
agg = ConfigResult(
config_name=cfg.name,
backend=cfg.backend,
model=cfg.model,
kv_type=cfg.kv_type,
total_prompts=prompt_count,
success=prompt_count,
failed=0,
avg_ttft_s=round(ttft, 3),
avg_tok_per_sec=round(tps, 2),
avg_latency_s=round(lat, 3),
peak_memory_mb=bl["mem"] + random.gauss(0, 50),
)
all_results.append(agg)
winner = pick_winner(all_results)
table = build_comparison_table(all_results)
report = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"prompts_count": 10,
"mode": "demo",
"winner": winner.config_name,
"winner_tok_per_sec": winner.avg_tok_per_sec,
"configs": [r.to_dict() for r in all_results],
"table": table,
}
print(f"\n{table}")
print(f"\nWinner: {winner.config_name} ({winner.avg_tok_per_sec:.2f} tok/s)")
if output_file:
os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True)
with open(output_file, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to {output_file}")
return report
# ---------------------------------------------------------------------------
# CLI
# ---------------------------------------------------------------------------
def main():
parser = argparse.ArgumentParser(
description="TurboQuant multi-config benchmark comparison")
parser.add_argument("--config", type=str,
help="JSON file with custom configurations")
parser.add_argument("--prompts", type=str,
default="benchmarks/test_prompts.json",
help="Path to test prompts JSON")
parser.add_argument("--output", type=str, default=None,
help="Output file for JSON report")
parser.add_argument("--timeout", type=int, default=120,
help="Timeout per prompt in seconds")
parser.add_argument("--demo", action="store_true",
help="Run with synthetic data (no servers)")
args = parser.parse_args()
if args.demo:
run_demo(args.output)
return
# Load configs
if args.config:
with open(args.config) as f:
raw = json.load(f)
configs = [ConfigEntry(**c) for c in raw]
else:
configs = DEFAULT_CONFIGS
# Load prompts
prompts = load_prompts(args.prompts)
run_comparison(configs, prompts, args.output, args.timeout)
if __name__ == "__main__":
main()

View File

@@ -1,75 +1,227 @@
#!/usr/bin/env python3
"""
TurboQuant Benchmarking Suite — Multi-Backend (Issue #29)
Supports Ollama and llama-server backends with KV cache type configuration.
Measures: TTFT, tokens/sec, latency, peak memory.
Usage:
# Ollama (default)
python3 benchmarks/run_benchmarks.py --backend ollama --model llama3
# llama-server with turbo4 KV
python3 benchmarks/run_benchmarks.py --backend llama-server \
--url http://localhost:11434 --model qwen3.5 --kv-type turbo4
"""
import argparse
import json
import time
import requests
import os
from typing import List, Dict
import re
import subprocess
import sys
import time
from datetime import datetime, timezone
from typing import List, Dict, Optional
# ═══════════════════════════════════════════
# TURBOQUANT BENCHMARKING SUITE (Issue #16)
# ═══════════════════════════════════════════
# This script runs a standardized set of prompts against the local inference
# engine (Ollama) and logs the results. This prevents cherry-picking and
# provides an objective baseline for quality comparisons.
import requests
OLLAMA_URL = "http://localhost:11434/api/generate"
PROMPTS_FILE = "benchmarks/prompts.json"
RESULTS_FILE = f"benchmarks/results_{int(time.time())}.json"
def run_benchmark(model: str = "llama3"):
"""Run the benchmark suite for a specific model."""
if not os.path.exists(PROMPTS_FILE):
print(f"Error: {PROMPTS_FILE} not found.")
return
def get_peak_memory_mb() -> float:
"""Get peak RSS of current process in MB (macOS/Linux)."""
try:
if sys.platform == "darwin":
result = subprocess.run(["ps", "-o", "rss=", "-p", str(os.getpid())],
capture_output=True, text=True)
return int(result.stdout.strip()) / 1024
else:
with open(f"/proc/{os.getpid()}/status") as f:
for line in f:
if line.startswith("VmHWM:"):
return int(line.split()[1]) / 1024
except Exception:
pass
return 0.0
with open(PROMPTS_FILE, 'r') as f:
def run_ollama(prompt: str, model: str, url: str, timeout: int = 120) -> dict:
"""Run a prompt against Ollama /api/generate."""
api_url = f"{url.rstrip('/')}/api/generate"
start = time.time()
ttft = None
tokens_per_sec = 0.0
try:
resp = requests.post(api_url, json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 512}
}, timeout=timeout)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
response_text = data.get("response", "")
eval_count = data.get("eval_count", 0)
eval_duration_ns = data.get("eval_duration", 0)
prompt_eval_ns = data.get("prompt_eval_duration", 0)
if eval_duration_ns > 0:
tokens_per_sec = eval_count / (eval_duration_ns / 1e9)
if prompt_eval_ns > 0:
ttft = prompt_eval_ns / 1e9
return {
"response": response_text,
"latency_s": round(elapsed, 3),
"ttft_s": round(ttft, 3) if ttft else None,
"tokens_per_sec": round(tokens_per_sec, 2),
"eval_count": eval_count,
"status": "success"
}
except Exception as e:
return {"status": "failed", "error": str(e), "latency_s": round(time.time() - start, 3)}
def run_llama_server(prompt: str, model: str, url: str, kv_type: str = "f16",
timeout: int = 120) -> dict:
"""Run a prompt against llama-server OpenAI-compatible API."""
api_url = f"{url.rstrip('/')}/v1/chat/completions"
start = time.time()
ttft = None
tokens_per_sec = 0.0
try:
resp = requests.post(api_url, json={
"model": model,
"messages": [{"role": "user", "content": prompt}],
"max_tokens": 512,
"stream": False
}, timeout=timeout)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
response_text = data.get("choices", [{}])[0].get("message", {}).get("content", "")
usage = data.get("usage", {})
completion_tokens = usage.get("completion_tokens", 0)
prompt_tokens = usage.get("prompt_tokens", 0)
# llama-server includes timing in x_* headers or we estimate
if elapsed > 0 and completion_tokens > 0:
# Subtract estimated prompt eval time (rough)
tokens_per_sec = completion_tokens / max(elapsed - 0.1, 0.01)
return {
"response": response_text,
"latency_s": round(elapsed, 3),
"ttft_s": round(ttft, 3) if ttft else None,
"tokens_per_sec": round(tokens_per_sec, 2),
"completion_tokens": completion_tokens,
"prompt_tokens": prompt_tokens,
"kv_type": kv_type,
"status": "success"
}
except Exception as e:
return {"status": "failed", "error": str(e), "latency_s": round(time.time() - start, 3)}
def run_benchmark_suite(backend: str, model: str, url: str, kv_type: str,
prompts_file: str, output_file: str, timeout: int = 120):
"""Run the full benchmark suite."""
if not os.path.exists(prompts_file):
print(f"ERROR: {prompts_file} not found")
sys.exit(1)
with open(prompts_file) as f:
prompts = json.load(f)
run_fn = run_ollama if backend == "ollama" else run_llama_server
mem_before = get_peak_memory_mb()
results = []
print(f"Starting benchmark for model: {model}")
print(f"Saving results to: {RESULTS_FILE}")
print(f"\n{'='*60}")
print(f"Backend: {backend} | Model: {model} | KV: {kv_type}")
print(f"URL: {url}")
print(f"Prompts: {len(prompts)} | Output: {output_file}")
print(f"{'='*60}\n")
for item in prompts:
print(f"Running prompt: {item['id']}...")
start_time = time.time()
try:
response = requests.post(OLLAMA_URL, json={
"model": model,
"prompt": item['prompt'],
"stream": False
}, timeout=60)
response.raise_for_status()
data = response.json()
end_time = time.time()
results.append({
"id": item['id'],
"prompt": item['prompt'],
"response": data.get("response"),
"latency": end_time - start_time,
"tokens_per_second": data.get("eval_count", 0) / (data.get("eval_duration", 1) / 1e9) if data.get("eval_duration") else 0,
"status": "success"
})
except Exception as e:
print(f"Error running prompt {item['id']}: {e}")
results.append({
"id": item['id'],
"prompt": item['prompt'],
"error": str(e),
"status": "failed"
})
pid = item.get("id", item.get("category", "unknown"))
prompt = item["prompt"]
print(f"[{pid}] Running...", end=" ", flush=True)
extra = {"kv_type": kv_type} if backend == "llama-server" else {}
result = run_fn(prompt, model, url, timeout=timeout)
result["id"] = pid
result["prompt_preview"] = prompt[:120]
result.update(extra)
status = "" if result["status"] == "success" else ""
tps = result.get("tokens_per_sec", 0)
lat = result.get("latency_s", 0)
print(f"{status} {tps:.1f} tok/s, {lat:.2f}s")
results.append(result)
mem_after = get_peak_memory_mb()
suite = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"backend": backend,
"model": model,
"kv_type": kv_type,
"url": url,
"prompts_file": prompts_file,
"memory_mb": round(max(mem_before, mem_after), 1),
"results": results,
"summary": {
"total": len(results),
"success": sum(1 for r in results if r["status"] == "success"),
"failed": sum(1 for r in results if r["status"] == "failed"),
"avg_tok_per_sec": round(
sum(r.get("tokens_per_sec", 0) for r in results if r["status"] == "success")
/ max(sum(1 for r in results if r["status"] == "success"), 1), 2
),
"avg_latency_s": round(
sum(r.get("latency_s", 0) for r in results if r["status"] == "success")
/ max(sum(1 for r in results if r["status"] == "success"), 1), 3
),
}
}
os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True)
with open(output_file, "w") as f:
json.dump(suite, f, indent=2)
s = suite["summary"]
print(f"\n{'='*60}")
print(f"RESULTS: {s['success']}/{s['total']} success | "
f"Avg {s['avg_tok_per_sec']:.1f} tok/s | "
f"Avg {s['avg_latency_s']:.2f}s latency")
print(f"{'='*60}")
print(f"Saved to {output_file}")
def main():
parser = argparse.ArgumentParser(description="TurboQuant Benchmark Suite")
parser.add_argument("--backend", choices=["ollama", "llama-server"], default="ollama")
parser.add_argument("--model", required=True, help="Model name")
parser.add_argument("--url", default="http://localhost:11434", help="Backend URL")
parser.add_argument("--kv-type", default="f16", help="KV cache type (llama-server only)")
parser.add_argument("--prompts", default="benchmarks/prompts.json", help="Prompts file")
parser.add_argument("--output", default=None, help="Output file (auto-generated if omitted)")
parser.add_argument("--timeout", type=int, default=120, help="Per-prompt timeout (s)")
args = parser.parse_args()
if args.output is None:
ts = int(time.time())
args.output = f"benchmarks/results_{args.backend}_{args.kv_type}_{ts}.json"
run_benchmark_suite(args.backend, args.model, args.url, args.kv_type,
args.prompts, args.output, args.timeout)
# Save results
with open(RESULTS_FILE, 'w') as f:
json.dump({
"model": model,
"timestamp": time.time(),
"results": results
}, f, indent=2)
print("Benchmark complete.")
if __name__ == "__main__":
# Default to llama3 for testing
run_benchmark("llama3")
main()

View File

@@ -0,0 +1,495 @@
#!/usr/bin/env python3
"""
TurboQuant Long-Session Quality Test (Issue #12)
Runs a 50-turn multi-step reasoning conversation to detect quality degradation
under sustained context pressure. Compares TurboQuant KV vs FP16 KV baseline.
Conversation flow (repeating cycle):
turns 1-10: code generation
turns 11-20: debugging (introduce bugs, ask to fix)
turns 21-30: refactoring (improve structure)
turns 31-40: testing (write tests, verify)
turns 41-50: iteration (modify and extend)
Usage:
# Ollama backend (default)
python3 benchmarks/run_long_session.py \\
--backend ollama --model llama3 --turns 50
# llama-server backend with KV type
python3 benchmarks/run_long_session.py \\
--backend llama-server --url http://localhost:8080 \\
--model qwen3.5 --kv-type turbo4 --turns 50
# Compare two runs
python3 benchmarks/run_long_session.py --compare run_turbo4.json run_fp16.json
Acceptance Criteria (Issue #12):
- 50-turn conversation on both TurboQuant and FP16
- Quality comparison documented
- Degradation flagged with turn number where it appears
"""
import argparse
import json
import os
import re
import sys
import time
import hashlib
from datetime import datetime, timezone
from pathlib import Path
from typing import Optional
try:
import requests
except ImportError:
requests = None
# ── Conversation Prompts ───────────────────────────────────────────────
CONVERSATION_CYCLE = [
# Phase 1: Code Generation (turns 1-10)
{
"phase": "code_gen",
"turns": [
"Write a Python class called RateLimiter that implements a token bucket algorithm. It should support: add_tokens(n), consume(n) -> bool, and a configurable rate and burst capacity.",
"Add thread-safety to the RateLimiter class using a lock. Make sure consume() blocks briefly if tokens are unavailable rather than failing immediately.",
"Now add a method get_wait_time(n) that returns how many seconds until n tokens will be available without blocking.",
"Write a companion class RateLimiterGroup that manages multiple RateLimiters keyed by string identifier, with a get_or_create(id, rate, burst) method.",
"Add a decorator @rate_limited(limiter_group, key_fn) that can be applied to async functions to rate-limit them.",
"Add serialization support — export_state() returns JSON-serializable dict, import_state() restores from dict. Include timestamps.",
"Add a Prometheus-compatible metrics exporter that tracks: tokens_consumed_total, tokens_rejected_total, wait_time_seconds histogram.",
"Write a configuration loader that reads rate limiter configs from YAML with validation and sensible defaults.",
"Add an LRU eviction policy for the RateLimiterGroup with configurable max_entries and idle_timeout_seconds.",
"Wrap everything into a pip-installable package structure with pyproject.toml, __init__.py exports, and a CLI entry point.",
]
},
# Phase 2: Debugging (turns 11-20)
{
"phase": "debug",
"turns": [
"I'm getting a race condition in consume() when two threads call it simultaneously with exactly the tokens needed. The lock doesn't seem to help. Can you trace through the logic and find the bug?",
"The get_wait_time() method returns negative values sometimes. Here's the traceback: ... Can you identify what's wrong?",
"RateLimiterGroup.get_or_create() sometimes returns a limiter with wrong parameters when called concurrently. Explain the potential issue.",
"The decorator @rate_limited doesn't properly propagate exceptions — they're being swallowed. Fix the error handling.",
"export_state() produces corrupted JSON when called while tokens are being consumed. How should we fix the serialization?",
"The Prometheus histogram for wait_time_seconds has incorrect bucket boundaries. Review the histogram configuration.",
"The YAML config loader doesn't handle missing optional fields gracefully — it raises KeyError instead of using defaults.",
"LRU eviction is evicting active limiters. The idle_timeout calculation seems wrong. Debug the eviction logic.",
"The CLI entry point crashes with a specific YAML config. Here's the config and error: ... What's the root cause?",
"Memory leak detected in RateLimiterGroup when creating/evicting many limiters rapidly. Where's the leak?",
]
},
# Phase 3: Refactoring (turns 21-30)
{
"phase": "refactor",
"turns": [
"Refactor RateLimiter to use a protocol/interface pattern so we can swap token bucket for leaky bucket or fixed window.",
"Extract the locking strategy into a separate mixin or context manager that can be swapped between threading.Lock, asyncio.Lock, and no-lock.",
"Refactor the metrics exporter to use a plugin architecture — different backends (Prometheus, StatsD, logging) should be pluggable.",
"Convert the YAML config loader to use a typed config dataclass with validation via pydantic or attrs.",
"Refactor RateLimiterGroup to use a generic container with type hints, making the key type configurable (not just str).",
"Extract the decorator into a separate module and make it work with both sync and async functions transparently.",
"Refactor the serialization to use a versioned schema so import_state() can handle older format versions.",
"Split the package into core (rate limiting), exporters (metrics), and config (YAML) subpackages.",
"Refactor the CLI to use click or typer with subcommands: serve, validate-config, export-state, import-state.",
"Apply the repository pattern to RateLimiterGroup — separate storage (in-memory, Redis, SQLite) from the limiter logic.",
]
},
# Phase 4: Testing (turns 31-40)
{
"phase": "testing",
"turns": [
"Write comprehensive unit tests for RateLimiter covering: basic consume, burst, refill timing, edge cases (zero tokens, negative values).",
"Write concurrency tests that hammer consume() with 100 threads and verify no tokens are double-counted.",
"Write tests for get_wait_time() including edge cases: already available, partial availability, and exact timing.",
"Write integration tests for RateLimiterGroup: concurrent create, LRU eviction under load, state consistency.",
"Write tests for the @rate_limited decorator: correct rate limiting, exception propagation, async/sync compatibility.",
"Write property-based tests using hypothesis: token conservation, monotonicity of wait times, idempotent serialization round-trips.",
"Write tests for the YAML config loader: valid configs, invalid schemas, missing fields, type coercion errors.",
"Write benchmark tests that measure throughput (operations/sec) and memory usage under various load patterns.",
"Write end-to-end tests simulating a real API server with multiple endpoints sharing a rate limiter group.",
"Write chaos tests: random delays, simulated clock skew, forced lock contention, and verify system stability.",
]
},
# Phase 5: Iteration (turns 41-50)
{
"phase": "iteration",
"turns": [
"Add support for weighted token buckets where different operations consume different amounts.",
"Implement a sliding window rate limiter as an alternative algorithm and add it to the protocol.",
"Add a REST API using FastAPI that exposes the rate limiter group with OpenAPI docs.",
"Add WebSocket support for real-time rate limit status streaming to clients.",
"Implement distributed rate limiting using Redis with Lua scripts for atomic operations.",
"Add a circuit breaker pattern integration — when a rate limit is consistently hit, auto-open the circuit.",
"Implement adaptive rate limiting that adjusts limits based on system load (CPU, memory).",
"Add request priority queues so high-priority requests can preempt low-priority ones when near limits.",
"Implement rate limit quotas with time windows (daily, weekly, monthly) in addition to per-second rates.",
"Write a migration guide and changelog for v2.0 with all the new features and breaking changes.",
]
},
]
# ── Quality Metrics ────────────────────────────────────────────────────
def compute_quality_metrics(response: str, prompt: str, turn: int, phase: str) -> dict:
"""Compute quality signals for a single turn response."""
metrics = {
"turn": turn,
"phase": phase,
"response_length": len(response),
"line_count": response.count("\n") + 1,
}
# Coherence: does response contain code-like content when expected?
code_indicators = ["def ", "class ", "import ", "return ", "if ", "for ", "while ", "{", "}", "=>"]
metrics["code_density"] = sum(1 for ind in code_indicators if ind in response) / len(code_indicators)
# Hallucination detection: references to non-existent earlier context
hallucination_phrases = [
"as mentioned earlier", "as we discussed", "like before",
"remember when", "from the previous turn", "as shown above",
"earlier in our conversation",
]
metrics["hallucinated_references"] = sum(
1 for p in hallucination_phrases if p.lower() in response.lower()
)
# Structural quality: does it have proper formatting?
metrics["has_headers"] = bool(re.search(r"^#{1,3}\s", response, re.MULTILINE))
metrics["has_code_blocks"] = response.count("```") >= 2
metrics["has_lists"] = bool(re.search(r"^[\-\*\d]\.\s", response, re.MULTILINE))
# Repetition detection: check for repeated sentences
sentences = [s.strip().lower() for s in re.split(r'[.!?]+', response) if len(s.strip()) > 20]
unique_sentences = set(sentences)
metrics["repetition_ratio"] = 1 - (len(unique_sentences) / max(len(sentences), 1))
# Attention to prompt: does it address the specific request?
prompt_keywords = set(re.findall(r'\b\w{4,}\b', prompt.lower()))
response_words = set(re.findall(r'\b\w{4,}\b', response.lower()))
metrics["prompt_relevance"] = len(prompt_keywords & response_words) / max(len(prompt_keywords), 1)
# Composite quality score (0-1)
metrics["quality_score"] = (
0.25 * min(metrics["code_density"] * 3, 1.0) +
0.20 * min(metrics["prompt_relevance"] * 2, 1.0) +
0.20 * (1.0 - min(metrics["repetition_ratio"] * 5, 1.0)) +
0.15 * (1.0 if metrics["has_code_blocks"] else 0.5) +
0.10 * (1.0 - min(metrics["hallucinated_references"] * 0.3, 1.0)) +
0.10 * (1.0 if metrics["has_lists"] else 0.7)
)
return metrics
def detect_degradation(turn_metrics: list, window: int = 5, threshold: float = 0.15) -> list:
"""Detect quality degradation by comparing rolling windows."""
alerts = []
for i in range(window, len(turn_metrics)):
recent = [turn_metrics[j]["quality_score"] for j in range(i - window, i)]
current = turn_metrics[i]["quality_score"]
avg_recent = sum(recent) / len(recent)
if avg_recent - current > threshold:
alerts.append({
"turn": turn_metrics[i]["turn"],
"phase": turn_metrics[i]["phase"],
"current_score": round(current, 3),
"window_avg": round(avg_recent, 3),
"drop": round(avg_recent - current, 3),
})
return alerts
# ── Backends ───────────────────────────────────────────────────────────
def query_ollama(prompt: str, model: str, url: str, history: list, timeout: int = 120) -> tuple:
"""Query Ollama with conversation history. Returns (response, stats)."""
messages = history + [{"role": "user", "content": prompt}]
api_url = f"{url.rstrip('/')}/api/chat"
start = time.time()
resp = requests.post(api_url, json={
"model": model,
"messages": messages,
"stream": False,
"options": {"num_ctx": 8192},
}, timeout=timeout)
elapsed = time.time() - start
data = resp.json()
content = data.get("message", {}).get("content", "")
eval_count = data.get("eval_count", 0)
eval_duration = data.get("eval_duration", 0) / 1e9 # ns to s
stats = {
"elapsed_s": round(elapsed, 2),
"tokens_generated": eval_count,
"tokens_per_s": round(eval_count / max(eval_duration, 0.001), 1),
"prompt_eval_count": data.get("prompt_eval_count", 0),
}
return content, stats
def query_llama_server(prompt: str, model: str, url: str, history: list,
kv_type: str = "f16", timeout: int = 120) -> tuple:
"""Query llama-server with conversation history and KV type."""
messages = history + [{"role": "user", "content": prompt}]
api_url = f"{url.rstrip('/')}/v1/chat/completions"
start = time.time()
resp = requests.post(api_url, json={
"model": model,
"messages": messages,
"temperature": 0.7,
"max_tokens": 2048,
}, headers={"Content-Type": "application/json"}, timeout=timeout)
elapsed = time.time() - start
data = resp.json()
content = data["choices"][0]["message"]["content"]
usage = data.get("usage", {})
stats = {
"elapsed_s": round(elapsed, 2),
"tokens_generated": usage.get("completion_tokens", 0),
"prompt_tokens": usage.get("prompt_tokens", 0),
"kv_type": kv_type,
}
return content, stats
# ── Main ───────────────────────────────────────────────────────────────
def run_session(args) -> dict:
"""Run the full 50-turn conversation session."""
total_turns = args.turns
history = []
turn_metrics = []
all_responses = []
# Flatten conversation cycle
all_prompts = []
for phase_data in CONVERSATION_CYCLE:
for turn_prompt in phase_data["turns"]:
all_prompts.append((phase_data["phase"], turn_prompt))
# Repeat cycle if needed
while len(all_prompts) < total_turns:
all_prompts.extend(all_prompts)
all_prompts = all_prompts[:total_turns]
query_fn = query_ollama if args.backend == "ollama" else query_llama_server
query_kwargs = {"model": args.model, "url": args.url}
if args.backend == "llama-server":
query_kwargs["kv_type"] = args.kv_type
print(f"\n{'='*70}")
print(f"Long-Session Quality Test — {total_turns} turns")
print(f"Backend: {args.backend} | Model: {args.model}")
if args.backend == "llama-server":
print(f"KV Type: {args.kv_type}")
print(f"{'='*70}\n")
for i, (phase, prompt) in enumerate(all_prompts):
turn_num = i + 1
print(f"[Turn {turn_num:2d}/{total_turns}] Phase: {phase:12s} | ", end="", flush=True)
try:
response, stats = query_fn(prompt, history=history, **query_kwargs, timeout=args.timeout)
except Exception as e:
print(f"ERROR: {e}")
response = f"[ERROR: {e}]"
stats = {"elapsed_s": 0, "tokens_generated": 0}
metrics = compute_quality_metrics(response, prompt, turn_num, phase)
metrics.update(stats)
turn_metrics.append(metrics)
all_responses.append({"turn": turn_num, "phase": phase, "prompt": prompt, "response": response})
# Update history (keep last N turns to manage context)
history.append({"role": "user", "content": prompt})
history.append({"role": "assistant", "content": response})
if len(history) > args.history_window * 2:
history = history[-(args.history_window * 2):]
print(f"score={metrics['quality_score']:.2f} | "
f"len={metrics['response_length']:4d} | "
f"{stats.get('tokens_per_s', '?')} tok/s | "
f"{stats['elapsed_s']:.1f}s")
if args.delay > 0:
time.sleep(args.delay)
# Detect degradation
degradation = detect_degradation(turn_metrics)
# Build report
report = {
"config": {
"backend": args.backend,
"model": args.model,
"kv_type": getattr(args, "kv_type", "f16"),
"total_turns": total_turns,
"history_window": args.history_window,
"timestamp": datetime.now(timezone.utc).isoformat(),
},
"turn_metrics": turn_metrics,
"degradation_alerts": degradation,
"summary": {
"avg_quality_score": round(sum(m["quality_score"] for m in turn_metrics) / len(turn_metrics), 3),
"min_quality_score": round(min(m["quality_score"] for m in turn_metrics), 3),
"max_quality_score": round(max(m["quality_score"] for m in turn_metrics), 3),
"total_degradation_events": len(degradation),
"first_degradation_turn": degradation[0]["turn"] if degradation else None,
"avg_response_length": round(sum(m["response_length"] for m in turn_metrics) / len(turn_metrics), 0),
"total_hallucinated_references": sum(m["hallucinated_references"] for m in turn_metrics),
"avg_repetition_ratio": round(sum(m["repetition_ratio"] for m in turn_metrics) / len(turn_metrics), 3),
},
"responses": all_responses if args.save_responses else [],
}
return report
def compare_reports(report_a: dict, report_b: dict) -> dict:
"""Compare two session reports and highlight differences."""
sa = report_a["summary"]
sb = report_b["summary"]
label_a = report_a["config"].get("kv_type", "run_a")
label_b = report_b["config"].get("kv_type", "run_b")
comparison = {
"labels": [label_a, label_b],
"avg_quality": [sa["avg_quality_score"], sb["avg_quality_score"]],
"min_quality": [sa["min_quality_score"], sb["min_quality_score"]],
"degradation_events": [sa["total_degradation_events"], sb["total_degradation_events"]],
"first_degradation": [sa["first_degradation_turn"], sb["first_degradation_turn"]],
"hallucinated_refs": [sa["total_hallucinated_references"], sb["total_hallucinated_references"]],
"repetition_ratio": [sa["avg_repetition_ratio"], sb["avg_repetition_ratio"]],
"quality_delta": round(sb["avg_quality_score"] - sa["avg_quality_score"], 3),
"verdict": "",
}
if comparison["quality_delta"] > 0.05:
comparison["verdict"] = f"{label_b} is BETTER by {comparison['quality_delta']:.3f}"
elif comparison["quality_delta"] < -0.05:
comparison["verdict"] = f"{label_a} is BETTER by {abs(comparison['quality_delta']):.3f}"
else:
comparison["verdict"] = "No significant quality difference"
return comparison
def print_report(report: dict):
"""Print a human-readable summary."""
s = report["summary"]
c = report["config"]
d = report["degradation_alerts"]
print(f"\n{'='*70}")
print(f"LONG-SESSION QUALITY REPORT")
print(f"{'='*70}")
print(f"Backend: {c['backend']} | Model: {c['model']} | KV: {c.get('kv_type', 'n/a')}")
print(f"Turns: {c['total_turns']} | History window: {c['history_window']}")
print(f"{''*70}")
print(f"Quality Score: avg={s['avg_quality_score']:.3f} min={s['min_quality_score']:.3f} max={s['max_quality_score']:.3f}")
print(f"Avg Response: {s['avg_response_length']:.0f} chars")
print(f"Repetition: {s['avg_repetition_ratio']:.3f}")
print(f"Hallucinations: {s['total_hallucinated_references']} total")
print(f"Degradations: {s['total_degradation_events']} events")
if s["first_degradation_turn"]:
print(f" ⚠ First degradation at turn {s['first_degradation_turn']}")
else:
print(f" ✓ No significant degradation detected")
if d:
print(f"\n{''*70}")
print(f"DEGRADATION ALERTS:")
for alert in d:
print(f" Turn {alert['turn']:2d} [{alert['phase']:10s}]: "
f"score={alert['current_score']:.3f} "
f"(window avg={alert['window_avg']:.3f}, "
f"drop={alert['drop']:.3f})")
# Per-phase averages
phases = {}
for m in report["turn_metrics"]:
phases.setdefault(m["phase"], []).append(m["quality_score"])
print(f"\n{''*70}")
print(f"PER-PHASE AVERAGES:")
for phase, scores in phases.items():
avg = sum(scores) / len(scores)
trend = "" if scores[-1] > scores[0] else "" if scores[-1] < scores[0] else ""
print(f" {phase:12s}: avg={avg:.3f} trend={trend} "
f"first={scores[0]:.3f} last={scores[-1]:.3f}")
print(f"{'='*70}\n")
def print_comparison(comp: dict):
"""Print comparison between two runs."""
print(f"\n{'='*70}")
print(f"QUALITY COMPARISON: {comp['labels'][0]} vs {comp['labels'][1]}")
print(f"{'='*70}")
print(f"{'Metric':<30s} {comp['labels'][0]:>15s} {comp['labels'][1]:>15s}")
print(f"{''*60}")
print(f"{'Avg Quality Score':<30s} {comp['avg_quality'][0]:>15.3f} {comp['avg_quality'][1]:>15.3f}")
print(f"{'Min Quality Score':<30s} {comp['min_quality'][0]:>15.3f} {comp['min_quality'][1]:>15.3f}")
print(f"{'Degradation Events':<30s} {comp['degradation_events'][0]:>15d} {comp['degradation_events'][1]:>15d}")
print(f"{'First Degradation Turn':<30s} {str(comp['first_degradation'][0] or 'none'):>15s} {str(comp['first_degradation'][1] or 'none'):>15s}")
print(f"{'Hallucinated References':<30s} {comp['hallucinated_refs'][0]:>15d} {comp['hallucinated_refs'][1]:>15d}")
print(f"{'Repetition Ratio':<30s} {comp['repetition_ratio'][0]:>15.3f} {comp['repetition_ratio'][1]:>15.3f}")
print(f"{''*60}")
print(f"Verdict: {comp['verdict']}")
print(f"{'='*70}\n")
def main():
parser = argparse.ArgumentParser(description="TurboQuant Long-Session Quality Test")
parser.add_argument("--backend", choices=["ollama", "llama-server"], default="ollama")
parser.add_argument("--model", default="llama3", help="Model name")
parser.add_argument("--url", default="http://localhost:11434", help="Backend URL")
parser.add_argument("--kv-type", default="f16", help="KV cache type (llama-server only)")
parser.add_argument("--turns", type=int, default=50, help="Number of conversation turns")
parser.add_argument("--history-window", type=int, default=20, help="Turns of history to keep")
parser.add_argument("--timeout", type=int, default=120, help="Per-turn timeout in seconds")
parser.add_argument("--delay", type=float, default=0.5, help="Delay between turns in seconds")
parser.add_argument("--output", "-o", help="Output JSON file path")
parser.add_argument("--save-responses", action="store_true", help="Include full responses in output")
parser.add_argument("--compare", nargs=2, metavar=("FILE_A", "FILE_B"),
help="Compare two previously saved run reports")
args = parser.parse_args()
# Compare mode
if args.compare:
with open(args.compare[0]) as f:
report_a = json.load(f)
with open(args.compare[1]) as f:
report_b = json.load(f)
comp = compare_reports(report_a, report_b)
print_comparison(comp)
return
# Run mode
if requests is None:
print("ERROR: 'requests' package required. Install with: pip install requests")
sys.exit(1)
report = run_session(args)
print_report(report)
# Save report
output_path = args.output or f"benchmarks/long_session_{args.kv_type}_{int(time.time())}.json"
os.makedirs(os.path.dirname(output_path) or ".", exist_ok=True)
with open(output_path, "w") as f:
json.dump(report, f, indent=2)
print(f"Report saved to: {output_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,164 @@
#!/usr/bin/env python3
"""
Tests for benchmark comparison module (Issue #29).
Covers: ConfigEntry, ConfigResult, aggregation, comparison table,
demo mode, and config loading.
"""
import json
import os
import sys
import tempfile
import unittest
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "benchmarks"))
from compare_configs import (
ConfigEntry,
ConfigResult,
DEFAULT_CONFIGS,
aggregate,
build_comparison_table,
load_prompts,
pick_winner,
run_demo,
)
class TestConfigEntry(unittest.TestCase):
def test_default_values(self):
c = ConfigEntry(name="test", backend="ollama", model="gemma4", url="http://x")
self.assertEqual(c.kv_type, "f16")
self.assertFalse(c.layer_adaptive)
def test_to_dict(self):
c = ConfigEntry(name="test", backend="llama-server", model="g", url="http://x",
kv_type="turbo4", layer_adaptive=True)
d = c.to_dict()
self.assertEqual(d["kv_type"], "turbo4")
self.assertTrue(d["layer_adaptive"])
class TestDefaultConfigs(unittest.TestCase):
def test_four_configs(self):
self.assertEqual(len(DEFAULT_CONFIGS), 4)
def test_names(self):
names = [c.name for c in DEFAULT_CONFIGS]
self.assertIn("ollama-gemma4", names)
self.assertIn("llama-f16", names)
self.assertIn("llama-turbo4", names)
self.assertIn("llama-turbo4-adaptive", names)
def test_turbo4_adaptive_has_flag(self):
cfg = next(c for c in DEFAULT_CONFIGS if c.name == "llama-turbo4-adaptive")
self.assertTrue(cfg.layer_adaptive)
self.assertEqual(cfg.kv_type, "turbo4")
class TestAggregate(unittest.TestCase):
def _make_results(self, n_success: int, n_fail: int) -> list[dict]:
results = []
for i in range(n_success):
results.append({
"status": "success",
"ttft_s": 0.5 + i * 0.1,
"tokens_per_sec": 20.0 + i * 0.5,
"latency_s": 1.0 + i * 0.05,
})
for _ in range(n_fail):
results.append({"status": "failed", "latency_s": 0.5})
return results
def test_basic_aggregate(self):
results = self._make_results(5, 1)
cfg = ConfigEntry(name="test", backend="ollama", model="m", url="http://x")
agg = aggregate(results, cfg, peak_mb=100.0)
self.assertEqual(agg.success, 5)
self.assertEqual(agg.failed, 1)
self.assertEqual(agg.total_prompts, 6)
self.assertAlmostEqual(agg.peak_memory_mb, 100.0)
self.assertGreater(agg.avg_tok_per_sec, 0)
def test_no_success(self):
results = [{"status": "failed", "latency_s": 0.1}]
cfg = ConfigEntry(name="test", backend="ollama", model="m", url="http://x")
agg = aggregate(results, cfg, peak_mb=0.0)
self.assertEqual(agg.avg_tok_per_sec, 0.0)
self.assertIsNone(agg.avg_ttft_s)
class TestPickWinner(unittest.TestCase):
def test_highest_tps_wins(self):
configs = [
ConfigResult(config_name="slow", backend="o", model="m", kv_type="f",
total_prompts=5, success=5, failed=0, avg_ttft_s=1.0,
avg_tok_per_sec=10.0, avg_latency_s=2.0, peak_memory_mb=100),
ConfigResult(config_name="fast", backend="o", model="m", kv_type="f",
total_prompts=5, success=5, failed=0, avg_ttft_s=0.5,
avg_tok_per_sec=25.0, avg_latency_s=1.5, peak_memory_mb=100),
]
w = pick_winner(configs)
self.assertEqual(w.config_name, "fast")
self.assertTrue(w.winner)
def test_no_success_returns_first(self):
configs = [
ConfigResult(config_name="dead", backend="o", model="m", kv_type="f",
total_prompts=5, success=0, failed=5, avg_ttft_s=None,
avg_tok_per_sec=0.0, avg_latency_s=0.0, peak_memory_mb=0),
]
w = pick_winner(configs)
self.assertEqual(w.config_name, "dead")
class TestComparisonTable(unittest.TestCase):
def test_table_has_headers(self):
configs = [
ConfigResult(config_name="test-cfg", backend="o", model="m", kv_type="f",
total_prompts=5, success=5, failed=0, avg_ttft_s=0.5,
avg_tok_per_sec=20.0, avg_latency_s=1.5, peak_memory_mb=100),
]
w = pick_winner(configs)
table = build_comparison_table(configs)
self.assertIn("Config", table)
self.assertIn("tok/s", table)
self.assertIn("WINNER", table)
class TestDemoMode(unittest.TestCase):
def test_demo_produces_report(self):
with tempfile.NamedTemporaryFile(suffix=".json", delete=False) as f:
out_path = Path(f.name)
try:
report = run_demo(str(out_path))
self.assertEqual(report["mode"], "demo")
self.assertEqual(report["prompts_count"], 10)
self.assertEqual(len(report["configs"]), 4)
self.assertTrue(out_path.exists())
saved = json.loads(out_path.read_text())
self.assertIn("winner", saved)
finally:
out_path.unlink(missing_ok=True)
def test_demo_without_output(self):
report = run_demo()
self.assertIn("winner", report)
self.assertGreater(report["winner_tok_per_sec"], 0)
class TestLoadPrompts(unittest.TestCase):
def test_load_test_prompts(self):
prompts_file = Path(__file__).resolve().parent.parent / "benchmarks" / "test_prompts.json"
if prompts_file.exists():
prompts = load_prompts(str(prompts_file))
self.assertGreater(len(prompts), 0)
for p in prompts:
self.assertIn("prompt", p)
if __name__ == "__main__":
unittest.main()