Compare commits
3 Commits
feat/97-au
...
burn/100-1
| Author | SHA1 | Date | |
|---|---|---|---|
| 0df3d084d6 | |||
| dd06e4c5e0 | |||
| 36819f9ec2 |
46
benchmarks/bonsai-1bit-2026-04-15.md
Normal file
46
benchmarks/bonsai-1bit-2026-04-15.md
Normal file
@@ -0,0 +1,46 @@
|
||||
# Bonsai 1-bit vs Q4_0 Benchmark Results
|
||||
|
||||
Generated: 2026-04-15
|
||||
|
||||
## Summary
|
||||
|
||||
| Model | Quant | Size (MB) | Memory (MB) | GSM8K | Tool Call | tok/s |
|
||||
|-------|-------|-----------|-------------|-------|-----------|-------|
|
||||
| Bonsai-8B | Q1_0 | TBD | TBD | TBD | TBD | TBD |
|
||||
| Bonsai-8B | Q4_0 | TBD | TBD | TBD | TBD | TBD |
|
||||
| Bonsai-4B | Q1_0 | TBD | TBD | TBD | TBD | TBD |
|
||||
| Bonsai-4B | Q4_0 | TBD | TBD | TBD | TBD | TBD |
|
||||
| Bonsai-1.7B | Q1_0 | TBD | TBD | TBD | TBD | TBD |
|
||||
| Bonsai-1.7B | Q4_0 | TBD | TBD | TBD | TBD | TBD |
|
||||
|
||||
## How to Run
|
||||
|
||||
```bash
|
||||
# Download models first (example)
|
||||
ollama pull prism-ml/Bonsai-8B-gguf:Q1_0
|
||||
ollama pull prism-ml/Bonsai-8B-gguf:Q4_0
|
||||
|
||||
# Run benchmark
|
||||
python3 benchmarks/bonsai_benchmark.py --model-dir /path/to/models --output benchmarks/bonsai-1bit-$(date +%Y-%m-%d).md
|
||||
```
|
||||
|
||||
## Metrics Explained
|
||||
|
||||
- **Size**: Model file size on disk (MB)
|
||||
- **Memory**: Peak memory usage during inference (MB)
|
||||
- **GSM8K**: Score on GSM8K math reasoning benchmark (0-100%)
|
||||
- **Tool Call**: Success rate on 10 tool calling test prompts (0-100%)
|
||||
- **tok/s**: Average tokens per second during inference
|
||||
|
||||
## Key Questions
|
||||
|
||||
1. Is 1-bit (Q1_0) usable for agent tool calling?
|
||||
2. What is the minimum viable model for edge deployment?
|
||||
3. Quality vs speed tradeoff curve
|
||||
|
||||
## Notes
|
||||
|
||||
- GSM8K uses 5 representative questions (subset for speed)
|
||||
- Tool calling tests measure if model mentions the correct tool
|
||||
- Memory measured as peak RSS of Python benchmark process
|
||||
- Results may vary by hardware (tested on M1/M4 Mac)
|
||||
506
benchmarks/bonsai_benchmark.py
Normal file
506
benchmarks/bonsai_benchmark.py
Normal file
@@ -0,0 +1,506 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Bonsai 1-bit Model Benchmark — Compare Q1_0 vs Q4_0 (Issue #100)
|
||||
|
||||
Benchmarks Prism ML Bonsai models (1.7B, 4B, 8B) at 1-bit (Q1_0) against Q4_0.
|
||||
|
||||
Metrics:
|
||||
- Model file size on disk
|
||||
- Memory usage at inference
|
||||
- Tokens/sec on M1/M4 Mac
|
||||
- GSM8K score (quality proxy)
|
||||
- Tool calling success rate (10 calls)
|
||||
|
||||
Usage:
|
||||
python3 benchmarks/bonsai_benchmark.py --model-dir /path/to/models
|
||||
python3 benchmarks/bonsai_benchmark.py --model-dir /path/to/models --ollama-url http://localhost:11434
|
||||
python3 benchmarks/bonsai_benchmark.py --model-dir /path/to/models --skip-tool-test
|
||||
"""
|
||||
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
import time
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional, Tuple
|
||||
|
||||
import requests
|
||||
|
||||
|
||||
# GSM8K test prompts (quality proxy)
|
||||
GSM8K_PROMPTS = [
|
||||
{
|
||||
"id": "gsm8k_1",
|
||||
"prompt": "Janet's ducks lay 16 eggs per day. She eats three for breakfast every morning and bakes muffins for her friends every day with four. She sells every duck egg at the farmers' market daily for $2. How much in dollars does she make every day at the farmers' market?",
|
||||
"expected_keywords": ["18", "$18", "eighteen"]
|
||||
},
|
||||
{
|
||||
"id": "gsm8k_2",
|
||||
"prompt": "A robe takes 2 bolts of blue fiber and half that much white fiber. How many bolts in total does it take?",
|
||||
"expected_keywords": ["3", "three"]
|
||||
},
|
||||
{
|
||||
"id": "gsm8k_3",
|
||||
"prompt": "Josh decides to try flipping a house. He buys a house for $80,000 and puts $50,000 in repairs. This increased the value of the house by 150%. How much profit did he make?",
|
||||
"expected_keywords": ["70000", "$70,000", "70,000"]
|
||||
},
|
||||
{
|
||||
"id": "gsm8k_4",
|
||||
"prompt": "Every day, Wendi feeds each of her chickens three cups of mixed chicken feed, containing a mixture of corn, soybeans, and fish meal. She gives the chickens their feed in three separate meals. In the morning, she gives her flock of chickens 15 cups of feed. In the afternoon, she gives her chickens another 25 cups of feed. How many cups of feed does she need to give her chickens in the final meal of the day?",
|
||||
"expected_keywords": ["40", "forty"]
|
||||
},
|
||||
{
|
||||
"id": "gsm8k_5",
|
||||
"prompt": "Kylar went to the store to buy glasses for his new apartment. One glass costs $5, but every second glass costs only 60% of the price. Kylar wants to buy 16 glasses. How much does he need to pay for them?",
|
||||
"expected_keywords": ["64", "$64"]
|
||||
}
|
||||
]
|
||||
|
||||
# Tool calling test prompts
|
||||
TOOL_TEST_PROMPTS = [
|
||||
{
|
||||
"id": "tool_1",
|
||||
"prompt": "Use the read_file tool to read the file 'README.md'. Then tell me the first line.",
|
||||
"tool_name": "read_file",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_2",
|
||||
"prompt": "Use the terminal tool to run 'echo hello world' and tell me the output.",
|
||||
"tool_name": "terminal",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_3",
|
||||
"prompt": "Search for files matching '*.py' in the current directory using the search_files tool.",
|
||||
"tool_name": "search_files",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_4",
|
||||
"prompt": "Use the read_file tool to read 'benchmarks/prompts.json' and count how many prompts are in it.",
|
||||
"tool_name": "read_file",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_5",
|
||||
"prompt": "Run the command 'ls -la' using the terminal tool and list the files.",
|
||||
"tool_name": "terminal",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_6",
|
||||
"prompt": "Search for the word 'TurboQuant' in all files using the search_files tool.",
|
||||
"tool_name": "search_files",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_7",
|
||||
"prompt": "Read the file 'docs/PROJECT_STATUS.md' using read_file and tell me the project status.",
|
||||
"tool_name": "read_file",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_8",
|
||||
"prompt": "Use the terminal tool to check the current git branch with 'git branch --show-current'.",
|
||||
"tool_name": "terminal",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_9",
|
||||
"prompt": "Search for any JSON files in the benchmarks directory using search_files.",
|
||||
"tool_name": "search_files",
|
||||
"success_check": "tool_called"
|
||||
},
|
||||
{
|
||||
"id": "tool_10",
|
||||
"prompt": "Read the CMakeLists.txt file using read_file and tell me what project it's for.",
|
||||
"tool_name": "read_file",
|
||||
"success_check": "tool_called"
|
||||
}
|
||||
]
|
||||
|
||||
|
||||
def get_model_file_size(model_path: str) -> Optional[int]:
|
||||
"""Get model file size in bytes."""
|
||||
try:
|
||||
return os.path.getsize(model_path)
|
||||
except (OSError, FileNotFoundError):
|
||||
return None
|
||||
|
||||
|
||||
def get_memory_usage_mb() -> float:
|
||||
"""Get current process memory usage in MB."""
|
||||
try:
|
||||
if sys.platform == "darwin":
|
||||
result = subprocess.run(
|
||||
["ps", "-o", "rss=", "-p", str(os.getpid())],
|
||||
capture_output=True, text=True
|
||||
)
|
||||
return int(result.stdout.strip()) / 1024
|
||||
else:
|
||||
with open(f"/proc/{os.getpid()}/status") as f:
|
||||
for line in f:
|
||||
if line.startswith("VmHWM:"):
|
||||
return int(line.split()[1]) / 1024
|
||||
except Exception:
|
||||
pass
|
||||
return 0.0
|
||||
|
||||
|
||||
def run_ollama_inference(prompt: str, model: str, url: str, timeout: int = 120) -> dict:
|
||||
"""Run inference via Ollama API."""
|
||||
api_url = f"{url.rstrip('/')}/api/generate"
|
||||
start = time.time()
|
||||
|
||||
try:
|
||||
resp = requests.post(api_url, json={
|
||||
"model": model,
|
||||
"prompt": prompt,
|
||||
"stream": False,
|
||||
"options": {"num_predict": 512}
|
||||
}, timeout=timeout)
|
||||
elapsed = time.time() - start
|
||||
resp.raise_for_status()
|
||||
data = resp.json()
|
||||
|
||||
response_text = data.get("response", "")
|
||||
eval_count = data.get("eval_count", 0)
|
||||
eval_duration_ns = data.get("eval_duration", 0)
|
||||
|
||||
tokens_per_sec = 0.0
|
||||
if eval_duration_ns > 0:
|
||||
tokens_per_sec = eval_count / (eval_duration_ns / 1e9)
|
||||
|
||||
return {
|
||||
"response": response_text,
|
||||
"latency_s": round(elapsed, 3),
|
||||
"tokens_per_sec": round(tokens_per_sec, 2),
|
||||
"eval_count": eval_count,
|
||||
"status": "success"
|
||||
}
|
||||
except Exception as e:
|
||||
return {"status": "failed", "error": str(e), "latency_s": round(time.time() - start, 3)}
|
||||
|
||||
|
||||
def check_gsm8k_answer(response: str, expected_keywords: List[str]) -> bool:
|
||||
"""Check if response contains expected answer."""
|
||||
response_lower = response.lower()
|
||||
for keyword in expected_keywords:
|
||||
if keyword.lower() in response_lower:
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def run_gsm8k_benchmark(model: str, url: str, timeout: int = 120) -> Tuple[float, List[dict]]:
|
||||
"""Run GSM8K benchmark and return score + detailed results."""
|
||||
results = []
|
||||
correct = 0
|
||||
|
||||
for item in GSM8K_PROMPTS:
|
||||
result = run_ollama_inference(item["prompt"], model, url, timeout)
|
||||
result["id"] = item["id"]
|
||||
|
||||
if result["status"] == "success":
|
||||
is_correct = check_gsm8k_answer(result["response"], item["expected_keywords"])
|
||||
result["correct"] = is_correct
|
||||
if is_correct:
|
||||
correct += 1
|
||||
else:
|
||||
result["correct"] = False
|
||||
|
||||
results.append(result)
|
||||
|
||||
score = correct / len(GSM8K_PROMPTS) if GSM8K_PROMPTS else 0
|
||||
return score, results
|
||||
|
||||
|
||||
def run_tool_calling_benchmark(model: str, url: str, timeout: int = 120) -> Tuple[float, List[dict]]:
|
||||
"""Run tool calling benchmark and return success rate + detailed results."""
|
||||
results = []
|
||||
successes = 0
|
||||
|
||||
for item in TOOL_TEST_PROMPTS:
|
||||
# For tool calling, we check if the model mentions using the tool
|
||||
# In a real implementation, this would involve actual tool execution
|
||||
result = run_ollama_inference(item["prompt"], model, url, timeout)
|
||||
result["id"] = item["id"]
|
||||
|
||||
if result["status"] == "success":
|
||||
# Simple heuristic: check if model mentions the tool name
|
||||
response_lower = result["response"].lower()
|
||||
tool_mentioned = item["tool_name"].lower() in response_lower
|
||||
result["tool_mentioned"] = tool_mentioned
|
||||
if tool_mentioned:
|
||||
successes += 1
|
||||
else:
|
||||
result["tool_mentioned"] = False
|
||||
|
||||
results.append(result)
|
||||
|
||||
success_rate = successes / len(TOOL_TEST_PROMPTS) if TOOL_TEST_PROMPTS else 0
|
||||
return success_rate, results
|
||||
|
||||
|
||||
def find_models(model_dir: str) -> Dict[str, List[str]]:
|
||||
"""Find Bonsai models in the directory."""
|
||||
models = {"Q1_0": [], "Q4_0": []}
|
||||
|
||||
if not os.path.isdir(model_dir):
|
||||
return models
|
||||
|
||||
for root, dirs, files in os.walk(model_dir):
|
||||
for file in files:
|
||||
if file.endswith(".gguf") or file.endswith(".bin"):
|
||||
filepath = os.path.join(root, file)
|
||||
if "Q1_0" in file.upper() or "q1_0" in file.lower():
|
||||
models["Q1_0"].append(filepath)
|
||||
elif "Q4_0" in file.upper() or "q4_0" in file.lower():
|
||||
models["Q4_0"].append(filepath)
|
||||
|
||||
return models
|
||||
|
||||
|
||||
def benchmark_model(model_path: str, model_name: str, quant_type: str,
|
||||
url: str, skip_tool_test: bool, timeout: int) -> dict:
|
||||
"""Benchmark a single model configuration."""
|
||||
print(f"\n{'='*60}")
|
||||
print(f"Benchmarking: {model_name} ({quant_type})")
|
||||
print(f"Path: {model_path}")
|
||||
print(f"{'='*60}\n")
|
||||
|
||||
# Get model size
|
||||
file_size_bytes = get_model_file_size(model_path)
|
||||
file_size_mb = file_size_bytes / (1024 * 1024) if file_size_bytes else None
|
||||
|
||||
# Measure memory before inference
|
||||
mem_before = get_memory_usage_mb()
|
||||
|
||||
# Run GSM8K benchmark
|
||||
print("Running GSM8K benchmark...")
|
||||
gsm8k_score, gsm8k_results = run_gsm8k_benchmark(model_name, url, timeout)
|
||||
correct_count = sum(1 for r in gsm8k_results if r.get('correct'))
|
||||
print(f"GSM8K Score: {gsm8k_score:.1%} ({correct_count}/{len(GSM8K_PROMPTS)})")
|
||||
|
||||
# Run tool calling benchmark
|
||||
tool_success_rate = 0.0
|
||||
tool_results = []
|
||||
if not skip_tool_test:
|
||||
print("Running tool calling benchmark...")
|
||||
tool_success_rate, tool_results = run_tool_calling_benchmark(model_name, url, timeout)
|
||||
tool_count = sum(1 for r in tool_results if r.get('tool_mentioned'))
|
||||
print(f"Tool Calling: {tool_success_rate:.1%} ({tool_count}/{len(TOOL_TEST_PROMPTS)})")
|
||||
|
||||
# Measure memory after inference
|
||||
mem_after = get_memory_usage_mb()
|
||||
memory_used_mb = max(mem_before, mem_after)
|
||||
|
||||
# Get average tokens/sec from GSM8K results
|
||||
successful_runs = [r for r in gsm8k_results if r["status"] == "success"]
|
||||
avg_tokens_per_sec = (
|
||||
sum(r.get("tokens_per_sec", 0) for r in successful_runs) / len(successful_runs)
|
||||
if successful_runs else 0.0
|
||||
)
|
||||
|
||||
return {
|
||||
"model_name": model_name,
|
||||
"quant_type": quant_type,
|
||||
"model_path": model_path,
|
||||
"file_size_mb": round(file_size_mb, 1) if file_size_mb else None,
|
||||
"memory_used_mb": round(memory_used_mb, 1),
|
||||
"gsm8k_score": round(gsm8k_score, 3),
|
||||
"gsm8k_correct": sum(1 for r in gsm8k_results if r.get("correct")),
|
||||
"gsm8k_total": len(GSM8K_PROMPTS),
|
||||
"tool_calling_rate": round(tool_success_rate, 3),
|
||||
"tool_calls_correct": sum(1 for r in tool_results if r.get("tool_mentioned")),
|
||||
"tool_calls_total": len(TOOL_TEST_PROMPTS),
|
||||
"avg_tokens_per_sec": round(avg_tokens_per_sec, 2),
|
||||
"gsm8k_results": gsm8k_results,
|
||||
"tool_results": tool_results
|
||||
}
|
||||
|
||||
|
||||
def generate_report(results: List[dict], output_file: str):
|
||||
"""Generate benchmark report in markdown format."""
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d %H:%M UTC")
|
||||
|
||||
lines = [
|
||||
f"# Bonsai 1-bit vs Q4_0 Benchmark Report",
|
||||
f"Generated: {timestamp}",
|
||||
"",
|
||||
"## Summary",
|
||||
"",
|
||||
"| Model | Quant | Size (MB) | Memory (MB) | GSM8K | Tool Call | tok/s |",
|
||||
"|-------|-------|-----------|-------------|-------|-----------|-------|"
|
||||
]
|
||||
|
||||
for r in results:
|
||||
size_str = f"{r['file_size_mb']:.1f}" if r['file_size_mb'] else "N/A"
|
||||
lines.append(
|
||||
f"| {r['model_name']} | {r['quant_type']} | {size_str} | "
|
||||
f"{r['memory_used_mb']:.1f} | {r['gsm8k_score']:.1%} | "
|
||||
f"{r['tool_calling_rate']:.1%} | {r['avg_tokens_per_sec']:.1f} |"
|
||||
)
|
||||
|
||||
lines.extend([
|
||||
"",
|
||||
"## Analysis",
|
||||
"",
|
||||
"### Quality Comparison",
|
||||
"- **GSM8K**: Higher is better (math reasoning capability)",
|
||||
"- **Tool Calling**: Higher is better (agent tool use reliability)",
|
||||
"",
|
||||
"### Speed & Memory",
|
||||
"- **tok/s**: Tokens per second (higher is faster)",
|
||||
"- **Memory**: Peak memory usage during inference",
|
||||
"- **Size**: Model file size on disk",
|
||||
"",
|
||||
"### Key Questions",
|
||||
"1. Is 1-bit (Q1_0) usable for agent tool calling?",
|
||||
"2. What is the minimum viable model for edge deployment?",
|
||||
"3. Quality vs speed tradeoff curve",
|
||||
"",
|
||||
"## Detailed Results",
|
||||
""
|
||||
])
|
||||
|
||||
for r in results:
|
||||
lines.extend([
|
||||
f"### {r['model_name']} ({r['quant_type']})",
|
||||
"",
|
||||
f"- **File**: `{r['model_path']}`",
|
||||
])
|
||||
|
||||
if r['file_size_mb']:
|
||||
lines.append(f"- **Size**: {r['file_size_mb']:.1f} MB")
|
||||
else:
|
||||
lines.append("- **Size**: Unknown")
|
||||
|
||||
lines.extend([
|
||||
f"- **Memory**: {r['memory_used_mb']:.1f} MB",
|
||||
f"- **GSM8K**: {r['gsm8k_correct']}/{r['gsm8k_total']} ({r['gsm8k_score']:.1%})",
|
||||
f"- **Tool Calling**: {r['tool_calls_correct']}/{r['tool_calls_total']} ({r['tool_calling_rate']:.1%})",
|
||||
f"- **Speed**: {r['avg_tokens_per_sec']:.1f} tok/s",
|
||||
"",
|
||||
"GSM8K Results:",
|
||||
""
|
||||
])
|
||||
|
||||
for gsm in r.get('gsm8k_results', []):
|
||||
status = "✓" if gsm.get('correct') else "✗"
|
||||
lines.append(f"- {status} {gsm['id']}: {gsm.get('tokens_per_sec', 0):.1f} tok/s")
|
||||
|
||||
lines.append("")
|
||||
|
||||
# Recommendations
|
||||
lines.extend([
|
||||
"## Recommendations",
|
||||
"",
|
||||
"Based on the benchmark results:",
|
||||
""
|
||||
])
|
||||
|
||||
if results:
|
||||
# Find best model for each use case
|
||||
best_quality = max(results, key=lambda x: x['gsm8k_score'])
|
||||
best_speed = max(results, key=lambda x: x['avg_tokens_per_sec'])
|
||||
best_tool = max(results, key=lambda x: x['tool_calling_rate'])
|
||||
|
||||
lines.extend([
|
||||
f"1. **Best Quality**: {best_quality['model_name']} ({best_quality['quant_type']}) — "
|
||||
f"GSM8K: {best_quality['gsm8k_score']:.1%}",
|
||||
f"2. **Best Speed**: {best_speed['model_name']} ({best_speed['quant_type']}) — "
|
||||
f"{best_speed['avg_tokens_per_sec']:.1f} tok/s",
|
||||
f"3. **Best Tool Calling**: {best_tool['model_name']} ({best_tool['quant_type']}) — "
|
||||
f"{best_tool['tool_calling_rate']:.1%}",
|
||||
"",
|
||||
"### Edge Deployment",
|
||||
"- For edge devices with limited memory, Q1_0 models may be viable",
|
||||
"- Tool calling reliability is critical for agent use cases",
|
||||
"- Consider quality/speed tradeoff for specific deployment scenarios"
|
||||
])
|
||||
|
||||
report = "\n".join(lines)
|
||||
|
||||
os.makedirs(os.path.dirname(output_file) or ".", exist_ok=True)
|
||||
with open(output_file, "w") as f:
|
||||
f.write(report)
|
||||
|
||||
print(f"\nReport saved to: {output_file}")
|
||||
return report
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Bonsai 1-bit vs Q4_0 Benchmark (Issue #100)")
|
||||
parser.add_argument("--model-dir", required=True,
|
||||
help="Directory containing GGUF model files")
|
||||
parser.add_argument("--ollama-url", default="http://localhost:11434",
|
||||
help="Ollama API URL")
|
||||
parser.add_argument("--output", default=None,
|
||||
help="Output markdown file (auto-generated if omitted)")
|
||||
parser.add_argument("--timeout", type=int, default=120,
|
||||
help="Per-prompt timeout in seconds")
|
||||
parser.add_argument("--skip-tool-test", action="store_true",
|
||||
help="Skip tool calling benchmark")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if not os.path.isdir(args.model_dir):
|
||||
print(f"Error: {args.model_dir} is not a directory", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Find models
|
||||
models = find_models(args.model_dir)
|
||||
all_models = models["Q1_0"] + models["Q4_0"]
|
||||
|
||||
if not all_models:
|
||||
print(f"No Bonsai models found in {args.model_dir}")
|
||||
print("Expected files with 'Q1_0' or 'Q4_0' in the name (.gguf or .bin)")
|
||||
sys.exit(1)
|
||||
|
||||
print(f"Found {len(models['Q1_0'])} Q1_0 models, {len(models['Q4_0'])} Q4_0 models")
|
||||
|
||||
# Generate output filename if not provided
|
||||
if args.output is None:
|
||||
timestamp = datetime.now(timezone.utc).strftime("%Y-%m-%d")
|
||||
args.output = f"benchmarks/bonsai-1bit-{timestamp}.md"
|
||||
|
||||
# Benchmark each model
|
||||
results = []
|
||||
for model_path in all_models:
|
||||
model_name = Path(model_path).stem
|
||||
quant_type = "Q1_0" if model_path in models["Q1_0"] else "Q4_0"
|
||||
|
||||
# Extract base model name (e.g., "Bonsai-8B" from "Bonsai-8B-Q1_0.gguf")
|
||||
base_name = model_name.split("-Q")[0] if "-Q" in model_name else model_name
|
||||
|
||||
result = benchmark_model(
|
||||
model_path=model_path,
|
||||
model_name=base_name,
|
||||
quant_type=quant_type,
|
||||
url=args.ollama_url,
|
||||
skip_tool_test=args.skip_tool_test,
|
||||
timeout=args.timeout
|
||||
)
|
||||
results.append(result)
|
||||
|
||||
# Generate report
|
||||
generate_report(results, args.output)
|
||||
|
||||
# Print summary
|
||||
print(f"\n{'='*60}")
|
||||
print("SUMMARY")
|
||||
print(f"{'='*60}")
|
||||
for r in results:
|
||||
print(f"{r['model_name']} ({r['quant_type']}): "
|
||||
f"GSM8K={r['gsm8k_score']:.1%}, "
|
||||
f"Tools={r['tool_calling_rate']:.1%}, "
|
||||
f"{r['avg_tokens_per_sec']:.1f} tok/s")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
134
benchmarks/test_bonsai_benchmark.py
Normal file
134
benchmarks/test_bonsai_benchmark.py
Normal file
@@ -0,0 +1,134 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for benchmarks/bonsai_benchmark.py — 8 tests."""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
|
||||
sys.path.insert(0, os.path.dirname(__file__) or ".")
|
||||
import importlib.util
|
||||
spec = importlib.util.spec_from_file_location(
|
||||
"bb", os.path.join(os.path.dirname(__file__) or ".", "bonsai_benchmark.py"))
|
||||
mod = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(mod)
|
||||
|
||||
check_gsm8k_answer = mod.check_gsm8k_answer
|
||||
find_models = mod.find_models
|
||||
generate_report = mod.generate_report
|
||||
|
||||
|
||||
def test_gsm8k_answer_correct():
|
||||
"""Correct answer should be detected."""
|
||||
assert check_gsm8k_answer("The answer is 18.", ["18", "$18", "eighteen"])
|
||||
print("PASS: test_gsm8k_answer_correct")
|
||||
|
||||
|
||||
def test_gsm8k_answer_case_insensitive():
|
||||
"""Answer check should be case insensitive."""
|
||||
assert check_gsm8k_answer("The answer is EIGHTEEN.", ["18", "eighteen"])
|
||||
print("PASS: test_gsm8k_answer_case_insensitive")
|
||||
|
||||
|
||||
def test_gsm8k_answer_wrong():
|
||||
"""Wrong answer should return False."""
|
||||
assert not check_gsm8k_answer("The answer is 42.", ["18", "$18", "eighteen"])
|
||||
print("PASS: test_gsm8k_answer_wrong")
|
||||
|
||||
|
||||
def test_gsm8k_answer_partial():
|
||||
"""Partial match should work."""
|
||||
assert check_gsm8k_answer("She makes $18 per day.", ["18", "$18"])
|
||||
print("PASS: test_gsm8k_answer_partial")
|
||||
|
||||
|
||||
def test_find_models_empty():
|
||||
"""Empty directory should return empty lists."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
models = find_models(tmpdir)
|
||||
assert models["Q1_0"] == []
|
||||
assert models["Q4_0"] == []
|
||||
print("PASS: test_find_models_empty")
|
||||
|
||||
|
||||
def test_find_models_with_files():
|
||||
"""Should find models by quantization type."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
# Create test files
|
||||
q1_file = os.path.join(tmpdir, "Bonsai-8B-Q1_0.gguf")
|
||||
q4_file = os.path.join(tmpdir, "Bonsai-8B-Q4_0.gguf")
|
||||
other_file = os.path.join(tmpdir, "other.txt")
|
||||
|
||||
for f in [q1_file, q4_file, other_file]:
|
||||
with open(f, "w") as fh:
|
||||
fh.write("")
|
||||
|
||||
models = find_models(tmpdir)
|
||||
assert len(models["Q1_0"]) == 1
|
||||
assert len(models["Q4_0"]) == 1
|
||||
assert q1_file in models["Q1_0"]
|
||||
assert q4_file in models["Q4_0"]
|
||||
print("PASS: test_find_models_with_files")
|
||||
|
||||
|
||||
def test_find_models_nested():
|
||||
"""Should find models in subdirectories."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
subdir = os.path.join(tmpdir, "models")
|
||||
os.makedirs(subdir)
|
||||
|
||||
q1_file = os.path.join(subdir, "Bonsai-1.7B-Q1_0.gguf")
|
||||
with open(q1_file, "w") as f:
|
||||
f.write("")
|
||||
|
||||
models = find_models(tmpdir)
|
||||
assert len(models["Q1_0"]) == 1
|
||||
assert q1_file in models["Q1_0"]
|
||||
print("PASS: test_find_models_nested")
|
||||
|
||||
|
||||
def test_generate_report():
|
||||
"""Report generation should produce markdown."""
|
||||
with tempfile.TemporaryDirectory() as tmpdir:
|
||||
results = [{
|
||||
"model_name": "Bonsai-8B",
|
||||
"quant_type": "Q1_0",
|
||||
"model_path": "/test/Bonsai-8B-Q1_0.gguf",
|
||||
"file_size_mb": 1024.5,
|
||||
"memory_used_mb": 2048.0,
|
||||
"gsm8k_score": 0.6,
|
||||
"gsm8k_correct": 3,
|
||||
"gsm8k_total": 5,
|
||||
"tool_calling_rate": 0.8,
|
||||
"tool_calls_correct": 8,
|
||||
"tool_calls_total": 10,
|
||||
"avg_tokens_per_sec": 15.2,
|
||||
"gsm8k_results": [],
|
||||
"tool_results": []
|
||||
}]
|
||||
|
||||
output_file = os.path.join(tmpdir, "report.md")
|
||||
report = generate_report(results, output_file)
|
||||
|
||||
assert os.path.exists(output_file)
|
||||
assert "Bonsai-8B" in report
|
||||
assert "Q1_0" in report
|
||||
assert "GSM8K" in report
|
||||
assert "60.0%" in report
|
||||
print("PASS: test_generate_report")
|
||||
|
||||
|
||||
def run_all():
|
||||
test_gsm8k_answer_correct()
|
||||
test_gsm8k_answer_case_insensitive()
|
||||
test_gsm8k_answer_wrong()
|
||||
test_gsm8k_answer_partial()
|
||||
test_find_models_empty()
|
||||
test_find_models_with_files()
|
||||
test_find_models_nested()
|
||||
test_generate_report()
|
||||
print("\nAll 8 tests passed!")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
run_all()
|
||||
@@ -1,548 +0,0 @@
|
||||
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
|
||||
|
||||
Detects hardware resources at startup and picks the highest quality
|
||||
quantization level that fits within available memory. Supports Apple
|
||||
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
|
||||
|
||||
Usage:
|
||||
from evolution.quant_selector import select_quant_level
|
||||
|
||||
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
print(selection.level) # "turbo4"
|
||||
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
|
||||
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Quant Level Definitions ───────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class QuantLevel:
|
||||
"""A TurboQuant compression level with its memory characteristics."""
|
||||
name: str # e.g. "turbo4"
|
||||
bits_per_channel: float # e.g. 3.5 for turbo4
|
||||
compression_ratio: float # vs uncompressed KV cache
|
||||
quality_label: str # "best", "high", "balanced", "fast"
|
||||
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
|
||||
kv_type: str # -ctk/-ctv flag value
|
||||
min_memory_headroom_gb: float # Minimum free memory to recommend this level
|
||||
description: str = ""
|
||||
|
||||
|
||||
# Ordered from highest quality to most aggressive compression
|
||||
QUANT_LEVELS = [
|
||||
QuantLevel(
|
||||
name="turbo4",
|
||||
bits_per_channel=3.5,
|
||||
compression_ratio=4.2,
|
||||
quality_label="best",
|
||||
layer_adaptive=7,
|
||||
kv_type="turbo4",
|
||||
min_memory_headroom_gb=4.0,
|
||||
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
|
||||
),
|
||||
QuantLevel(
|
||||
name="turbo3",
|
||||
bits_per_channel=2.5,
|
||||
compression_ratio=6.0,
|
||||
quality_label="high",
|
||||
layer_adaptive=5,
|
||||
kv_type="turbo3",
|
||||
min_memory_headroom_gb=3.0,
|
||||
description="3-bit TurboQuant. High quality, ~6x KV compression."
|
||||
),
|
||||
QuantLevel(
|
||||
name="turbo2",
|
||||
bits_per_channel=1.5,
|
||||
compression_ratio=10.0,
|
||||
quality_label="balanced",
|
||||
layer_adaptive=3,
|
||||
kv_type="turbo2",
|
||||
min_memory_headroom_gb=2.0,
|
||||
description="2-bit TurboQuant. Balanced, ~10x KV compression."
|
||||
),
|
||||
QuantLevel(
|
||||
name="q4_0",
|
||||
bits_per_channel=4.0,
|
||||
compression_ratio=3.5,
|
||||
quality_label="fast",
|
||||
layer_adaptive=0,
|
||||
kv_type="q4_0",
|
||||
min_memory_headroom_gb=1.5,
|
||||
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
# ── Hardware Detection ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class HardwareInfo:
|
||||
"""Detected hardware resources."""
|
||||
total_memory_gb: float
|
||||
available_memory_gb: float
|
||||
gpu_memory_gb: Optional[float] = None
|
||||
gpu_name: Optional[str] = None
|
||||
is_apple_silicon: bool = False
|
||||
chip_name: Optional[str] = None
|
||||
cpu_cores: int = 0
|
||||
detection_method: str = ""
|
||||
|
||||
|
||||
def detect_hardware() -> HardwareInfo:
|
||||
"""Detect available memory and GPU resources."""
|
||||
system = platform.system()
|
||||
|
||||
if system == "Darwin":
|
||||
return _detect_apple_silicon()
|
||||
elif system == "Linux":
|
||||
return _detect_linux()
|
||||
else:
|
||||
return _detect_generic(system)
|
||||
|
||||
|
||||
def _detect_apple_silicon() -> HardwareInfo:
|
||||
"""Detect Apple Silicon unified memory."""
|
||||
info = HardwareInfo(
|
||||
total_memory_gb=0,
|
||||
available_memory_gb=0,
|
||||
is_apple_silicon=True,
|
||||
detection_method="sysctl",
|
||||
)
|
||||
|
||||
try:
|
||||
# Get total memory
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "hw.memsize"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
|
||||
|
||||
# Get chip name
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "machdep.cpu.brand_string"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
info.chip_name = result.stdout.strip()
|
||||
|
||||
# Try to get GPU name (Apple Silicon)
|
||||
result = subprocess.run(
|
||||
["system_profiler", "SPDisplaysDataType"],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
if result.returncode == 0:
|
||||
for line in result.stdout.split("\n"):
|
||||
if "Chipset" in line or "GPU" in line:
|
||||
info.gpu_name = line.split(":")[-1].strip()
|
||||
break
|
||||
|
||||
# Estimate available memory (vm_stat)
|
||||
result = subprocess.run(
|
||||
["vm_stat"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
page_size = 4096 # macOS default
|
||||
free_pages = 0
|
||||
for line in result.stdout.split("\n"):
|
||||
if "Pages free:" in line:
|
||||
try:
|
||||
free_pages = int(line.split(":")[-1].strip().rstrip("."))
|
||||
except ValueError:
|
||||
pass
|
||||
# Available ≈ free + some speculative (conservative: just free)
|
||||
info.available_memory_gb = (free_pages * page_size) / (1024**3)
|
||||
|
||||
# Fallback if vm_stat parsing failed
|
||||
if info.available_memory_gb < 1:
|
||||
# Conservative: 70% of total
|
||||
info.available_memory_gb = info.total_memory_gb * 0.70
|
||||
|
||||
# Apple Silicon shares memory — GPU memory = total memory
|
||||
info.gpu_memory_gb = info.total_memory_gb
|
||||
|
||||
# Detect CPU cores
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "hw.ncpu"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
info.cpu_cores = int(result.stdout.strip())
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Apple Silicon detection failed: {e}")
|
||||
# Fallback
|
||||
info.total_memory_gb = 16.0
|
||||
info.available_memory_gb = 12.0
|
||||
info.detection_method = "fallback"
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def _detect_linux() -> HardwareInfo:
|
||||
"""Detect Linux system with optional NVIDIA GPU."""
|
||||
info = HardwareInfo(
|
||||
total_memory_gb=0,
|
||||
available_memory_gb=0,
|
||||
detection_method="proc",
|
||||
)
|
||||
|
||||
try:
|
||||
# Read /proc/meminfo
|
||||
with open("/proc/meminfo", "r") as f:
|
||||
meminfo = f.read()
|
||||
|
||||
for line in meminfo.split("\n"):
|
||||
if line.startswith("MemTotal:"):
|
||||
kb = int(line.split()[1])
|
||||
info.total_memory_gb = kb / (1024 * 1024)
|
||||
elif line.startswith("MemAvailable:"):
|
||||
kb = int(line.split()[1])
|
||||
info.available_memory_gb = kb / (1024 * 1024)
|
||||
|
||||
# CPU cores
|
||||
info.cpu_cores = os.cpu_count() or 1
|
||||
|
||||
# Check for NVIDIA GPU
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
|
||||
"--format=csv,noheader,nounits"],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
lines = result.stdout.strip().split("\n")
|
||||
if lines:
|
||||
parts = lines[0].split(", ")
|
||||
if len(parts) >= 3:
|
||||
info.gpu_name = parts[0].strip()
|
||||
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
|
||||
gpu_free = float(parts[2]) / 1024
|
||||
# Use GPU free for VRAM-based selection
|
||||
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
|
||||
info.detection_method = "nvidia-smi"
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass # No NVIDIA GPU
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Linux detection failed: {e}")
|
||||
info.total_memory_gb = 16.0
|
||||
info.available_memory_gb = 12.0
|
||||
info.detection_method = "fallback"
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def _detect_generic(system: str) -> HardwareInfo:
|
||||
"""Fallback detection for unknown systems."""
|
||||
import psutil
|
||||
mem = psutil.virtual_memory()
|
||||
return HardwareInfo(
|
||||
total_memory_gb=mem.total / (1024**3),
|
||||
available_memory_gb=mem.available / (1024**3),
|
||||
cpu_cores=os.cpu_count() or 1,
|
||||
detection_method="psutil",
|
||||
)
|
||||
|
||||
|
||||
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
|
||||
|
||||
def estimate_kv_cache_gb(
|
||||
context_length: int,
|
||||
num_layers: int = 48,
|
||||
num_kv_heads: int = 8,
|
||||
head_dim: int = 128,
|
||||
bits_per_channel: float = 3.5,
|
||||
) -> float:
|
||||
"""Estimate KV cache memory for given parameters.
|
||||
|
||||
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
|
||||
"""
|
||||
bytes_per_element = bits_per_channel / 8.0
|
||||
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
|
||||
return total_bytes / (1024**3)
|
||||
|
||||
|
||||
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
|
||||
"""Estimate model weights memory. Returns loaded size in GB.
|
||||
|
||||
This is a rough estimate — actual depends on exact quant format.
|
||||
"""
|
||||
# Common quant ratios (vs fp16)
|
||||
quant_multipliers = {
|
||||
"f16": 1.0,
|
||||
"q8_0": 0.5,
|
||||
"q6_k": 0.42,
|
||||
"q5_k_m": 0.37,
|
||||
"q4_k_m": 0.32,
|
||||
"q3_k_m": 0.27,
|
||||
"q2_k": 0.22,
|
||||
}
|
||||
# model_size_gb is already quantized size
|
||||
return model_size_gb
|
||||
|
||||
|
||||
# ── Selection Logic ───────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class QuantSelection:
|
||||
"""Result of quantization level selection."""
|
||||
level: QuantLevel
|
||||
hardware: HardwareInfo
|
||||
reasoning: str
|
||||
total_required_gb: float
|
||||
available_gb: float
|
||||
headroom_gb: float
|
||||
env_vars: dict = field(default_factory=dict)
|
||||
server_flags: dict = field(default_factory=dict)
|
||||
warnings: list = field(default_factory=list)
|
||||
|
||||
|
||||
def select_quant_level(
|
||||
model_size_gb: float = 14.0,
|
||||
context_length: int = 32768,
|
||||
num_layers: int = 48,
|
||||
num_kv_heads: int = 8,
|
||||
head_dim: int = 128,
|
||||
preferred_level: Optional[str] = None,
|
||||
force_cpu: bool = False,
|
||||
) -> QuantSelection:
|
||||
"""Select the best quantization level for available hardware.
|
||||
|
||||
Args:
|
||||
model_size_gb: Size of the model weights in GB
|
||||
context_length: Target context length
|
||||
num_layers: Number of transformer layers
|
||||
num_kv_heads: Number of KV attention heads
|
||||
head_dim: Dimension per attention head
|
||||
preferred_level: Force a specific level (still checks if it fits)
|
||||
force_cpu: If True, ignore GPU memory
|
||||
|
||||
Returns:
|
||||
QuantSelection with the chosen level and reasoning
|
||||
"""
|
||||
hw = detect_hardware()
|
||||
|
||||
if force_cpu:
|
||||
hw.gpu_memory_gb = None
|
||||
hw.gpu_name = None
|
||||
|
||||
# Use the most restrictive memory constraint
|
||||
# For Apple Silicon: unified memory, use total
|
||||
# For NVIDIA: use GPU VRAM
|
||||
# For CPU-only: use system RAM
|
||||
if hw.gpu_memory_gb and hw.gpu_name:
|
||||
memory_pool_gb = hw.gpu_memory_gb
|
||||
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
|
||||
elif hw.is_apple_silicon:
|
||||
memory_pool_gb = hw.total_memory_gb
|
||||
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
|
||||
else:
|
||||
memory_pool_gb = hw.total_memory_gb
|
||||
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
|
||||
|
||||
model_mem = estimate_model_memory_gb(model_size_gb)
|
||||
|
||||
# Try levels from best to most compressed
|
||||
chosen = None
|
||||
for level in QUANT_LEVELS:
|
||||
if preferred_level and level.name != preferred_level:
|
||||
continue
|
||||
|
||||
kv_mem = estimate_kv_cache_gb(
|
||||
context_length, num_layers, num_kv_heads, head_dim,
|
||||
level.bits_per_channel
|
||||
)
|
||||
total_required = model_mem + kv_mem
|
||||
headroom = memory_pool_gb - total_required
|
||||
|
||||
if headroom >= level.min_memory_headroom_gb:
|
||||
chosen = level
|
||||
break
|
||||
|
||||
if preferred_level and level.name == preferred_level:
|
||||
# User forced this level but it doesn't fit
|
||||
chosen = level
|
||||
break
|
||||
|
||||
if chosen is None:
|
||||
# Nothing fits — pick the most aggressive compression
|
||||
chosen = QUANT_LEVELS[-1]
|
||||
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
|
||||
|
||||
# Calculate final numbers
|
||||
kv_mem = estimate_kv_cache_gb(
|
||||
context_length, num_layers, num_kv_heads, head_dim,
|
||||
chosen.bits_per_channel
|
||||
)
|
||||
total_required = model_mem + kv_mem
|
||||
headroom = memory_pool_gb - total_required
|
||||
|
||||
# Build reasoning
|
||||
reasoning_parts = [
|
||||
f"{memory_label}:",
|
||||
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
|
||||
f"{chosen.compression_ratio:.1f}x compression)",
|
||||
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
|
||||
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
|
||||
f"({headroom:.1f}GB headroom)"
|
||||
]
|
||||
reasoning = " ".join(reasoning_parts)
|
||||
|
||||
# Build environment variables for llama.cpp
|
||||
env_vars = {
|
||||
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
|
||||
}
|
||||
|
||||
# Build server flags
|
||||
server_flags = {
|
||||
"-ctk": chosen.kv_type,
|
||||
"-ctv": chosen.kv_type,
|
||||
"-c": str(context_length),
|
||||
}
|
||||
|
||||
# Warnings
|
||||
warnings = []
|
||||
if headroom < 2.0:
|
||||
warnings.append(
|
||||
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
|
||||
)
|
||||
if headroom < 0:
|
||||
warnings.append(
|
||||
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
|
||||
f"Inference may fail or swap heavily."
|
||||
)
|
||||
|
||||
selection = QuantSelection(
|
||||
level=chosen,
|
||||
hardware=hw,
|
||||
reasoning=reasoning,
|
||||
total_required_gb=total_required,
|
||||
available_gb=memory_pool_gb,
|
||||
headroom_gb=headroom,
|
||||
env_vars=env_vars,
|
||||
server_flags=server_flags,
|
||||
warnings=warnings,
|
||||
)
|
||||
|
||||
logger.info(f"Quant selection: {reasoning}")
|
||||
for w in warnings:
|
||||
logger.warning(w)
|
||||
|
||||
return selection
|
||||
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
"""CLI entry point for quant level selection."""
|
||||
import argparse
|
||||
import json
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Auto-select TurboQuant compression level based on available hardware"
|
||||
)
|
||||
parser.add_argument("--model-size", type=float, default=14.0,
|
||||
help="Model size in GB (default: 14.0)")
|
||||
parser.add_argument("--context", type=int, default=32768,
|
||||
help="Target context length (default: 32768)")
|
||||
parser.add_argument("--layers", type=int, default=48,
|
||||
help="Number of transformer layers (default: 48)")
|
||||
parser.add_argument("--kv-heads", type=int, default=8,
|
||||
help="Number of KV attention heads (default: 8)")
|
||||
parser.add_argument("--head-dim", type=int, default=128,
|
||||
help="Dimension per attention head (default: 128)")
|
||||
parser.add_argument("--prefer", type=str, default=None,
|
||||
choices=[l.name for l in QUANT_LEVELS],
|
||||
help="Prefer a specific quant level")
|
||||
parser.add_argument("--force-cpu", action="store_true",
|
||||
help="Ignore GPU, use CPU memory only")
|
||||
parser.add_argument("--json", action="store_true",
|
||||
help="JSON output for automation")
|
||||
parser.add_argument("--detect-only", action="store_true",
|
||||
help="Only detect hardware, don't select")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||||
|
||||
if args.detect_only:
|
||||
hw = detect_hardware()
|
||||
if args.json:
|
||||
print(json.dumps(hw.__dict__, default=str, indent=2))
|
||||
else:
|
||||
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
|
||||
print(f"Available: {hw.available_memory_gb:.1f} GB")
|
||||
if hw.gpu_memory_gb:
|
||||
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
|
||||
if hw.gpu_name:
|
||||
print(f"GPU: {hw.gpu_name}")
|
||||
if hw.is_apple_silicon:
|
||||
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
|
||||
print(f"CPU cores: {hw.cpu_cores}")
|
||||
print(f"Detection: {hw.detection_method}")
|
||||
return
|
||||
|
||||
selection = select_quant_level(
|
||||
model_size_gb=args.model_size,
|
||||
context_length=args.context,
|
||||
num_layers=args.layers,
|
||||
num_kv_heads=args.kv_heads,
|
||||
head_dim=args.head_dim,
|
||||
preferred_level=args.prefer,
|
||||
force_cpu=args.force_cpu,
|
||||
)
|
||||
|
||||
if args.json:
|
||||
result = {
|
||||
"level": selection.level.name,
|
||||
"bits_per_channel": selection.level.bits_per_channel,
|
||||
"compression_ratio": selection.level.compression_ratio,
|
||||
"quality": selection.level.quality_label,
|
||||
"reasoning": selection.reasoning,
|
||||
"total_required_gb": round(selection.total_required_gb, 2),
|
||||
"available_gb": round(selection.available_gb, 1),
|
||||
"headroom_gb": round(selection.headroom_gb, 2),
|
||||
"env_vars": selection.env_vars,
|
||||
"server_flags": selection.server_flags,
|
||||
"warnings": selection.warnings,
|
||||
"hardware": {
|
||||
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
|
||||
"gpu_name": selection.hardware.gpu_name,
|
||||
"is_apple_silicon": selection.hardware.is_apple_silicon,
|
||||
"chip_name": selection.hardware.chip_name,
|
||||
"cpu_cores": selection.hardware.cpu_cores,
|
||||
},
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
|
||||
print(f" {selection.reasoning}")
|
||||
print()
|
||||
print(f"Environment variables:")
|
||||
for k, v in selection.env_vars.items():
|
||||
print(f" export {k}={v}")
|
||||
print()
|
||||
print(f"Server flags:")
|
||||
for k, v in selection.server_flags.items():
|
||||
print(f" {k} {v}")
|
||||
if selection.warnings:
|
||||
print()
|
||||
for w in selection.warnings:
|
||||
print(f" WARNING: {w}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,3 +0,0 @@
|
||||
"""Pytest configuration for turboquant."""
|
||||
import sys, os
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
@@ -1,108 +0,0 @@
|
||||
"""
|
||||
Tests for TurboQuant auto-select module.
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from turboquant.auto_select import (
|
||||
select_preset,
|
||||
PRESETS,
|
||||
QUALITY_ORDER,
|
||||
SelectionResult,
|
||||
)
|
||||
|
||||
|
||||
class TestSelectPreset:
|
||||
"""Test preset selection logic."""
|
||||
|
||||
def test_high_overhead_selects_best(self):
|
||||
"""8+ GB overhead should select turboquant_k8v4."""
|
||||
result = select_preset(available_gb=20, model_size_gb=10)
|
||||
assert result.preset == "turboquant_k8v4"
|
||||
assert result.quality == "best"
|
||||
|
||||
def test_medium_overhead_selects_good(self):
|
||||
"""4-8 GB overhead should select turboquant_4bit_nc."""
|
||||
result = select_preset(available_gb=12, model_size_gb=6)
|
||||
assert result.preset == "turboquant_4bit_nc"
|
||||
assert result.quality == "good"
|
||||
|
||||
def test_low_overhead_selects_usable(self):
|
||||
"""2-4 GB overhead should select turboquant_3bit_nc."""
|
||||
result = select_preset(available_gb=8, model_size_gb=5)
|
||||
assert result.preset == "turboquant_3bit_nc"
|
||||
assert result.quality == "usable"
|
||||
|
||||
def test_minimal_overhead_selects_fallback(self):
|
||||
"""<2 GB overhead should select q4_0 fallback."""
|
||||
result = select_preset(available_gb=5, model_size_gb=4)
|
||||
assert result.preset == "q4_0"
|
||||
assert result.quality == "basic"
|
||||
|
||||
def test_negative_overhead_selects_fallback(self):
|
||||
"""Negative overhead (not enough memory) should select fallback."""
|
||||
result = select_preset(available_gb=3, model_size_gb=10)
|
||||
assert result.preset == "q4_0"
|
||||
assert result.overhead_gb < 0
|
||||
|
||||
def test_vllm_requirement_filters(self):
|
||||
"""require_vllm should only select vLLM-compatible presets."""
|
||||
result = select_preset(available_gb=5, model_size_gb=4, require_vllm=True)
|
||||
# q4_0 is not vLLM compatible, should still be selected as fallback
|
||||
# but the logic should try vLLM-compatible first
|
||||
assert result.preset in ["turboquant_k8v4", "turboquant_4bit_nc", "turboquant_3bit_nc", "q4_0"]
|
||||
|
||||
|
||||
class TestSelectionResult:
|
||||
"""Test SelectionResult dataclass."""
|
||||
|
||||
def test_to_dict(self):
|
||||
result = SelectionResult(
|
||||
preset="turboquant_k8v4",
|
||||
reason="test",
|
||||
overhead_gb=10.0,
|
||||
quality="best",
|
||||
compression_ratio=2.6,
|
||||
vllm_compatible=True,
|
||||
)
|
||||
d = result.to_dict()
|
||||
assert d["preset"] == "turboquant_k8v4"
|
||||
assert d["compression_ratio"] == 2.6
|
||||
|
||||
|
||||
class TestPresets:
|
||||
"""Test preset definitions."""
|
||||
|
||||
def test_all_presets_have_required_fields(self):
|
||||
"""All presets should have required fields."""
|
||||
for name, preset in PRESETS.items():
|
||||
assert "name" in preset
|
||||
assert "description" in preset
|
||||
assert "min_overhead_gb" in preset
|
||||
assert "compression_ratio" in preset
|
||||
assert "quality" in preset
|
||||
assert "vllm_compatible" in preset
|
||||
|
||||
def test_quality_order_matches_presets(self):
|
||||
"""Quality order should include all presets."""
|
||||
for name in QUALITY_ORDER:
|
||||
assert name in PRESETS
|
||||
|
||||
|
||||
class TestBoundaryConditions:
|
||||
"""Test boundary conditions."""
|
||||
|
||||
def test_exact_threshold(self):
|
||||
"""Exactly at threshold should select that preset."""
|
||||
# 8 GB overhead exactly
|
||||
result = select_preset(available_gb=12, model_size_gb=4)
|
||||
assert result.preset == "turboquant_k8v4"
|
||||
|
||||
def test_just_below_threshold(self):
|
||||
"""Just below threshold should select next tier."""
|
||||
# 7.9 GB overhead
|
||||
result = select_preset(available_gb=11.9, model_size_gb=4)
|
||||
assert result.preset == "turboquant_4bit_nc"
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
@@ -1,163 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for quant_selector.py"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from evolution.quant_selector import (
|
||||
QuantLevel,
|
||||
HardwareInfo,
|
||||
QUANT_LEVELS,
|
||||
detect_hardware,
|
||||
estimate_kv_cache_gb,
|
||||
estimate_model_memory_gb,
|
||||
select_quant_level,
|
||||
)
|
||||
|
||||
|
||||
class TestQuantLevels:
|
||||
def test_levels_ordered_by_quality(self):
|
||||
"""Levels should be ordered from best quality to most aggressive."""
|
||||
for i in range(len(QUANT_LEVELS) - 1):
|
||||
assert QUANT_LEVELS[i].bits_per_channel > QUANT_LEVELS[i + 1].bits_per_channel
|
||||
|
||||
def test_all_levels_have_required_fields(self):
|
||||
for level in QUANT_LEVELS:
|
||||
assert level.name
|
||||
assert level.bits_per_channel > 0
|
||||
assert level.compression_ratio > 1
|
||||
assert level.quality_label
|
||||
assert level.layer_adaptive >= 0
|
||||
assert level.kv_type
|
||||
|
||||
|
||||
class TestKVEstimate:
|
||||
def test_basic_estimate(self):
|
||||
# 48 layers, 8 heads, 128 dim, 32K context, 3.5 bits
|
||||
kv_gb = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
|
||||
assert kv_gb > 0
|
||||
assert kv_gb < 10 # Should be reasonable
|
||||
|
||||
def test_longer_context_larger(self):
|
||||
kv_32k = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
|
||||
kv_128k = estimate_kv_cache_gb(131072, 48, 8, 128, 3.5)
|
||||
assert kv_128k > kv_32k
|
||||
|
||||
def test_higher_bits_larger(self):
|
||||
kv_4b = estimate_kv_cache_gb(32768, 48, 8, 128, 4.0)
|
||||
kv_2b = estimate_kv_cache_gb(32768, 48, 8, 128, 2.0)
|
||||
assert kv_4b > kv_2b
|
||||
|
||||
|
||||
class TestHardwareDetection:
|
||||
def test_detect_returns_info(self):
|
||||
hw = detect_hardware()
|
||||
assert hw.total_memory_gb > 0
|
||||
assert hw.available_memory_gb > 0
|
||||
assert hw.detection_method
|
||||
|
||||
@patch("evolution.quant_selector.platform.system", return_value="Linux")
|
||||
@patch("builtins.open", create=True)
|
||||
def test_linux_detection(self, mock_open, mock_system):
|
||||
mock_open.return_value.__enter__().read.return_value = (
|
||||
"MemTotal: 32000000 kB\n"
|
||||
"MemAvailable: 24000000 kB\n"
|
||||
)
|
||||
hw = _detect_linux_fallback()
|
||||
assert hw.total_memory_gb > 20
|
||||
|
||||
|
||||
def _detect_linux_fallback():
|
||||
"""Helper to test Linux detection with mocked /proc/meminfo."""
|
||||
from evolution.quant_selector import _detect_linux
|
||||
return _detect_linux()
|
||||
|
||||
|
||||
class TestSelection:
|
||||
def test_selects_turbo4_for_large_memory(self):
|
||||
"""With plenty of memory, should pick turbo4 (best quality)."""
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=64,
|
||||
available_memory_gb=48,
|
||||
gpu_memory_gb=64,
|
||||
gpu_name="Test GPU",
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
assert sel.level.name == "turbo4"
|
||||
assert sel.headroom_gb > 0
|
||||
|
||||
def test_selects_smaller_for_tight_memory(self):
|
||||
"""With tight memory, should pick a smaller quant."""
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=16,
|
||||
available_memory_gb=12,
|
||||
gpu_memory_gb=16,
|
||||
gpu_name="Test GPU",
|
||||
cpu_cores=8,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=131072)
|
||||
# Should pick a smaller quant for 128K context on 16GB
|
||||
assert sel.level.bits_per_channel <= 4.0
|
||||
|
||||
def test_preferred_level(self):
|
||||
"""User can force a specific level."""
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=64,
|
||||
available_memory_gb=48,
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(
|
||||
model_size_gb=14.0, context_length=32768,
|
||||
preferred_level="turbo2"
|
||||
)
|
||||
assert sel.level.name == "turbo2"
|
||||
|
||||
def test_env_vars_populated(self):
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=64,
|
||||
available_memory_gb=48,
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
assert "TURBO_LAYER_ADAPTIVE" in sel.env_vars
|
||||
assert "-ctk" in sel.server_flags
|
||||
assert "-ctv" in sel.server_flags
|
||||
|
||||
def test_warnings_on_low_headroom(self):
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=18,
|
||||
available_memory_gb=14,
|
||||
gpu_memory_gb=18,
|
||||
gpu_name="Test GPU",
|
||||
cpu_cores=8,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=16.0, context_length=65536)
|
||||
assert len(sel.warnings) > 0
|
||||
|
||||
def test_reasoning_contains_key_info(self):
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=32,
|
||||
available_memory_gb=24,
|
||||
is_apple_silicon=True,
|
||||
chip_name="M4 Max",
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
assert "turbo4" in sel.reasoning
|
||||
assert "M4 Max" in sel.reasoning or "32GB" in sel.reasoning
|
||||
@@ -1,338 +0,0 @@
|
||||
"""
|
||||
Integration test: turboquant compressed model passes hermes tool calls (issue #82).
|
||||
|
||||
Validates that a TurboQuant-compressed model can:
|
||||
1. Parse hermes tool schemas correctly
|
||||
2. Format tool calls in OpenAI-compatible format
|
||||
3. Pass through the hermes agent conversation loop
|
||||
|
||||
Tests are structured as contract tests -- they validate the schema/format
|
||||
compatibility without requiring a running model server. The live inference
|
||||
test is skipped by default (requires llama-server with TurboQuant model).
|
||||
|
||||
Usage:
|
||||
pytest tests/test_tool_call_integration.py -v
|
||||
pytest tests/test_tool_call_integration.py -v -k live # run live test if server available
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||
PROFILE_PATH = ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml"
|
||||
BENCHMARKS_DIR = ROOT / "benchmarks"
|
||||
|
||||
|
||||
class TestHermesProfileSchema(unittest.TestCase):
|
||||
"""Validate the hermes profile YAML has required fields for tool calling."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
import yaml
|
||||
cls.profile = yaml.safe_load(PROFILE_PATH.read_text())
|
||||
|
||||
def test_profile_has_providers(self):
|
||||
assert "providers" in self.profile, "Profile must define providers"
|
||||
assert "primary" in self.profile["providers"], "Must have primary provider"
|
||||
|
||||
def test_primary_provider_has_endpoint(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "endpoint" in primary, "Primary provider must have endpoint"
|
||||
assert primary["endpoint"].startswith("http"), "Endpoint must be HTTP(S) URL"
|
||||
|
||||
def test_primary_provider_has_api_path(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "api_path" in primary, "Primary provider must have api_path"
|
||||
assert "/chat/completions" in primary["api_path"], (
|
||||
"api_path should be OpenAI-compatible /chat/completions"
|
||||
)
|
||||
|
||||
def test_turboquant_settings_present(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "turboquant" in primary, "Must have turboquant config section"
|
||||
tq = primary["turboquant"]
|
||||
assert tq.get("enabled") is True, "TurboQuant must be enabled"
|
||||
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4"), (
|
||||
"kv_type must be turbo2, turbo3, or turbo4"
|
||||
)
|
||||
|
||||
def test_context_window_configured(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "context" in primary, "Must have context config"
|
||||
ctx = primary["context"]
|
||||
assert ctx.get("max_tokens", 0) >= 8192, (
|
||||
"max_tokens should be >= 8192 for TurboQuant value proposition"
|
||||
)
|
||||
|
||||
|
||||
class TestToolSchemaCompatibility(unittest.TestCase):
|
||||
"""Verify hermes tool schemas serialize to valid JSON for OpenAI tool_calls."""
|
||||
|
||||
SAMPLE_TOOL_SCHEMAS = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a text file with line numbers.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "File path"},
|
||||
"offset": {"type": "integer", "default": 1},
|
||||
"limit": {"type": "integer", "default": 500},
|
||||
},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "execute_code",
|
||||
"description": "Run a Python script.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"code": {"type": "string", "description": "Python code"},
|
||||
},
|
||||
"required": ["code"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string"},
|
||||
"max_results": {"type": "integer", "default": 5},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
def test_tool_schemas_serialize_to_json(self):
|
||||
"""Tool schemas must serialize without errors."""
|
||||
serialized = json.dumps(self.SAMPLE_TOOL_SCHEMAS)
|
||||
assert len(serialized) > 0
|
||||
parsed = json.loads(serialized)
|
||||
assert len(parsed) == len(self.SAMPLE_TOOL_SCHEMAS)
|
||||
|
||||
def test_tool_schemas_have_required_openai_fields(self):
|
||||
"""Each tool schema must have the fields OpenAI expects."""
|
||||
for tool in self.SAMPLE_TOOL_SCHEMAS:
|
||||
assert tool["type"] == "function", "Tool type must be 'function'"
|
||||
fn = tool["function"]
|
||||
assert "name" in fn, "Function must have name"
|
||||
assert "description" in fn, "Function must have description"
|
||||
assert "parameters" in fn, "Function must have parameters"
|
||||
params = fn["parameters"]
|
||||
assert params["type"] == "object", "Parameters type must be 'object'"
|
||||
assert "properties" in params, "Parameters must have properties"
|
||||
|
||||
def test_tool_call_response_format(self):
|
||||
"""Verify tool_call response matches OpenAI format."""
|
||||
tool_call = {
|
||||
"id": "call_abc123",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"arguments": json.dumps({"path": "/tmp/test.txt"}),
|
||||
},
|
||||
}
|
||||
args = json.loads(tool_call["function"]["arguments"])
|
||||
assert args["path"] == "/tmp/test.txt"
|
||||
assert tool_call["function"]["name"] in [
|
||||
t["function"]["name"] for t in self.SAMPLE_TOOL_SCHEMAS
|
||||
]
|
||||
|
||||
def test_tool_names_are_valid_identifiers(self):
|
||||
"""Tool names must be valid Python identifiers for hermes dispatch."""
|
||||
for tool in self.SAMPLE_TOOL_SCHEMAS:
|
||||
name = tool["function"]["name"]
|
||||
assert re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name), (
|
||||
f"Tool name \'{name}\' is not a valid identifier"
|
||||
)
|
||||
|
||||
|
||||
class TestTurboquantServerConfig(unittest.TestCase):
|
||||
"""Validate server startup configuration matches hermes profile."""
|
||||
|
||||
def test_server_command_has_turboquant_flags(self):
|
||||
"""The server command in the profile must include -ctk/-ctv flags."""
|
||||
profile_text = PROFILE_PATH.read_text()
|
||||
assert "-ctk" in profile_text, "Profile server command must include -ctk flag"
|
||||
assert "-ctv" in profile_text, "Profile server command must include -ctv flag"
|
||||
|
||||
def test_server_command_has_context_flag(self):
|
||||
"""Server command must set context size."""
|
||||
profile_text = PROFILE_PATH.read_text()
|
||||
assert re.search(r"-c\s+\d+", profile_text), (
|
||||
"Server command must include -c <context_size> flag"
|
||||
)
|
||||
|
||||
def test_layer_adaptive_env_var(self):
|
||||
"""Profile must set TURBO_LAYER_ADAPTIVE env var."""
|
||||
profile_text = PROFILE_PATH.read_text()
|
||||
assert "TURBO_LAYER_ADAPTIVE" in profile_text, (
|
||||
"Profile must configure TURBO_LAYER_ADAPTIVE"
|
||||
)
|
||||
|
||||
|
||||
class TestBenchmarkData(unittest.TestCase):
|
||||
"""Validate benchmark test prompts include tool-call test cases."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
prompts_path = BENCHMARKS_DIR / "test_prompts.json"
|
||||
cls.prompts = json.loads(prompts_path.read_text())
|
||||
|
||||
def test_has_tool_call_test_prompt(self):
|
||||
"""Benchmark prompts must include a tool-call format test."""
|
||||
categories = [p.get("category") for p in self.prompts]
|
||||
assert "tool_call_format" in categories, (
|
||||
"Benchmark must include a tool_call_format test case"
|
||||
)
|
||||
|
||||
def test_tool_call_prompt_expects_json(self):
|
||||
"""Tool call test prompt must expect JSON in the response."""
|
||||
tool_prompt = next(
|
||||
p for p in self.prompts if p.get("category") == "tool_call_format"
|
||||
)
|
||||
pattern = tool_prompt.get("expected_pattern", "")
|
||||
assert "json" in pattern.lower() or "\\{" in pattern, (
|
||||
"Tool call prompt must expect JSON-formatted response"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not os.environ.get("TURBOQUANT_SERVER_URL"),
|
||||
reason="No TurboQuant server available (set TURBOQUANT_SERVER_URL to run)",
|
||||
)
|
||||
class TestLiveToolCallIntegration:
|
||||
"""Live integration test -- requires running llama-server with TurboQuant."""
|
||||
|
||||
def test_server_health(self):
|
||||
"""Server must respond to /v1/models endpoint."""
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
resp = requests.get(f"{url}/v1/models", timeout=10)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "data" in data
|
||||
assert len(data["data"]) > 0
|
||||
|
||||
def test_tool_call_completion(self):
|
||||
"""Model must return a valid tool_call for a read_file prompt."""
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"path": {"type": "string"}},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
}
|
||||
]
|
||||
resp = requests.post(
|
||||
f"{url}/v1/chat/completions",
|
||||
json={
|
||||
"model": "gemma-4",
|
||||
"messages": [
|
||||
{"role": "user", "content": "Read the file at /tmp/test.txt"}
|
||||
],
|
||||
"tools": tools,
|
||||
"tool_choice": "auto",
|
||||
},
|
||||
timeout=120,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
choice = data["choices"][0]
|
||||
msg = choice["message"]
|
||||
if "tool_calls" in msg and msg["tool_calls"]:
|
||||
tc = msg["tool_calls"][0]
|
||||
assert tc["type"] == "function"
|
||||
assert tc["function"]["name"] == "read_file"
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert "path" in args
|
||||
else:
|
||||
assert len(msg.get("content", "")) > 0
|
||||
|
||||
def test_tool_call_with_multiple_tools(self):
|
||||
"""Model must handle multiple available tools."""
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"path": {"type": "string"}},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"query": {"type": "string"}},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "execute_code",
|
||||
"description": "Run Python code",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"code": {"type": "string"}},
|
||||
"required": ["code"],
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
resp = requests.post(
|
||||
f"{url}/v1/chat/completions",
|
||||
json={
|
||||
"model": "gemma-4",
|
||||
"messages": [
|
||||
{"role": "user", "content": "Search the web for 'bitcoin price'"}
|
||||
],
|
||||
"tools": tools,
|
||||
"tool_choice": "auto",
|
||||
},
|
||||
timeout=120,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "choices" in data
|
||||
assert len(data["choices"]) > 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
@@ -1,277 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
TurboQuant Auto-Select — Choose optimal preset based on available memory.
|
||||
|
||||
Detects system memory and selects the best TurboQuant preset for
|
||||
KV cache compression based on overhead after loading the model.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
from dataclasses import dataclass
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Preset definitions with quality/speed tradeoffs
|
||||
PRESETS = {
|
||||
"turboquant_k8v4": {
|
||||
"name": "TurboQuant K8V4",
|
||||
"description": "Best quality, 2.6x compression",
|
||||
"min_overhead_gb": 8,
|
||||
"compression_ratio": 2.6,
|
||||
"quality": "best",
|
||||
"vllm_compatible": True,
|
||||
},
|
||||
"turboquant_4bit_nc": {
|
||||
"name": "TurboQuant 4-bit NC",
|
||||
"description": "Good quality, 3.8x compression",
|
||||
"min_overhead_gb": 4,
|
||||
"compression_ratio": 3.8,
|
||||
"quality": "good",
|
||||
"vllm_compatible": True,
|
||||
},
|
||||
"turboquant_3bit_nc": {
|
||||
"name": "TurboQuant 3-bit NC",
|
||||
"description": "Usable quality, 4.9x compression",
|
||||
"min_overhead_gb": 2,
|
||||
"compression_ratio": 4.9,
|
||||
"quality": "usable",
|
||||
"vllm_compatible": True,
|
||||
},
|
||||
"q4_0": {
|
||||
"name": "Q4_0 GGUF",
|
||||
"description": "GGUF fallback, no vLLM",
|
||||
"min_overhead_gb": 0,
|
||||
"compression_ratio": 4.0,
|
||||
"quality": "basic",
|
||||
"vllm_compatible": False,
|
||||
},
|
||||
}
|
||||
|
||||
# Quality order (best to worst)
|
||||
QUALITY_ORDER = ["turboquant_k8v4", "turboquant_4bit_nc", "turboquant_3bit_nc", "q4_0"]
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemInfo:
|
||||
"""System memory information."""
|
||||
total_gb: float
|
||||
available_gb: float
|
||||
gpu_memory_gb: Optional[float] = None
|
||||
|
||||
@classmethod
|
||||
def detect(cls) -> "SystemInfo":
|
||||
"""Detect system memory."""
|
||||
import psutil
|
||||
|
||||
mem = psutil.virtual_memory()
|
||||
total_gb = mem.total / (1024**3)
|
||||
available_gb = mem.available / (1024**3)
|
||||
|
||||
# Try to detect GPU memory
|
||||
gpu_gb = None
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
["nvidia-smi", "--query-gpu=memory.total", "--format=csv,noheader,nounits"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
gpu_mb = int(result.stdout.strip().split("\n")[0])
|
||||
gpu_gb = gpu_mb / 1024
|
||||
except (FileNotFoundError, ValueError, subprocess.TimeoutExpired):
|
||||
pass
|
||||
|
||||
return cls(
|
||||
total_gb=round(total_gb, 1),
|
||||
available_gb=round(available_gb, 1),
|
||||
gpu_memory_gb=round(gpu_gb, 1) if gpu_gb else None,
|
||||
)
|
||||
|
||||
|
||||
@dataclass
|
||||
class SelectionResult:
|
||||
"""Result of preset selection."""
|
||||
preset: str
|
||||
reason: str
|
||||
overhead_gb: float
|
||||
quality: str
|
||||
compression_ratio: float
|
||||
vllm_compatible: bool
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
return {
|
||||
"preset": self.preset,
|
||||
"reason": self.reason,
|
||||
"overhead_gb": self.overhead_gb,
|
||||
"quality": self.quality,
|
||||
"compression_ratio": self.compression_ratio,
|
||||
"vllm_compatible": self.vllm_compatible,
|
||||
}
|
||||
|
||||
|
||||
def select_preset(
|
||||
available_gb: float,
|
||||
model_size_gb: float,
|
||||
prefer_quality: bool = True,
|
||||
require_vllm: bool = False,
|
||||
) -> SelectionResult:
|
||||
"""
|
||||
Select the best TurboQuant preset based on available memory.
|
||||
|
||||
Args:
|
||||
available_gb: Available system memory in GB
|
||||
model_size_gb: Model size in GB
|
||||
prefer_quality: If True, prefer higher quality presets
|
||||
require_vllm: If True, only select vLLM-compatible presets
|
||||
|
||||
Returns:
|
||||
SelectionResult with chosen preset and reasoning
|
||||
"""
|
||||
overhead_gb = available_gb - model_size_gb
|
||||
|
||||
if overhead_gb < 0:
|
||||
# Not enough memory for model
|
||||
logger.warning(
|
||||
"Insufficient memory: need %.1f GB, have %.1f GB available",
|
||||
model_size_gb, available_gb
|
||||
)
|
||||
return SelectionResult(
|
||||
preset="q4_0",
|
||||
reason=f"Insufficient memory ({overhead_gb:.1f} GB deficit), using GGUF fallback",
|
||||
overhead_gb=overhead_gb,
|
||||
quality="basic",
|
||||
compression_ratio=4.0,
|
||||
vllm_compatible=False,
|
||||
)
|
||||
|
||||
# Select preset based on overhead
|
||||
for preset_name in QUALITY_ORDER:
|
||||
preset = PRESETS[preset_name]
|
||||
|
||||
# Skip if vLLM required but not compatible
|
||||
if require_vllm and not preset["vllm_compatible"]:
|
||||
continue
|
||||
|
||||
if overhead_gb >= preset["min_overhead_gb"]:
|
||||
reason = f"Overhead {overhead_gb:.1f} GB >= {preset['min_overhead_gb']} GB required for {preset['name']}"
|
||||
logger.info("Selected preset: %s — %s", preset_name, reason)
|
||||
|
||||
return SelectionResult(
|
||||
preset=preset_name,
|
||||
reason=reason,
|
||||
overhead_gb=overhead_gb,
|
||||
quality=preset["quality"],
|
||||
compression_ratio=preset["compression_ratio"],
|
||||
vllm_compatible=preset["vllm_compatible"],
|
||||
)
|
||||
|
||||
# Fallback
|
||||
return SelectionResult(
|
||||
preset="q4_0",
|
||||
reason=f"Overhead {overhead_gb:.1f} GB too low for TurboQuant, using GGUF fallback",
|
||||
overhead_gb=overhead_gb,
|
||||
quality="basic",
|
||||
compression_ratio=4.0,
|
||||
vllm_compatible=False,
|
||||
)
|
||||
|
||||
|
||||
def auto_select(
|
||||
model_size_gb: float,
|
||||
config_override: Optional[str] = None,
|
||||
prefer_quality: bool = True,
|
||||
require_vllm: bool = False,
|
||||
) -> SelectionResult:
|
||||
"""
|
||||
Auto-select preset based on system detection.
|
||||
|
||||
Args:
|
||||
model_size_gb: Model size in GB
|
||||
config_override: Optional preset override from config
|
||||
prefer_quality: Prefer higher quality presets
|
||||
require_vllm: Require vLLM compatibility
|
||||
|
||||
Returns:
|
||||
SelectionResult
|
||||
"""
|
||||
# Check for config override
|
||||
if config_override:
|
||||
if config_override in PRESETS:
|
||||
preset = PRESETS[config_override]
|
||||
logger.info("Using config override: %s", config_override)
|
||||
return SelectionResult(
|
||||
preset=config_override,
|
||||
reason=f"Config override: {preset['name']}",
|
||||
overhead_gb=0, # Unknown without system detection
|
||||
quality=preset["quality"],
|
||||
compression_ratio=preset["compression_ratio"],
|
||||
vllm_compatible=preset["vllm_compatible"],
|
||||
)
|
||||
else:
|
||||
logger.warning("Unknown preset in config: %s, falling back to auto-select", config_override)
|
||||
|
||||
# Detect system
|
||||
sys_info = SystemInfo.detect()
|
||||
logger.info(
|
||||
"System: %.1f GB total, %.1f GB available, model: %.1f GB",
|
||||
sys_info.total_gb, sys_info.available_gb, model_size_gb
|
||||
)
|
||||
|
||||
# Select preset
|
||||
return select_preset(
|
||||
available_gb=sys_info.available_gb,
|
||||
model_size_gb=model_size_gb,
|
||||
prefer_quality=prefer_quality,
|
||||
require_vllm=require_vllm,
|
||||
)
|
||||
|
||||
|
||||
def get_preset_info(preset_name: str) -> Optional[dict]:
|
||||
"""Get information about a preset."""
|
||||
return PRESETS.get(preset_name)
|
||||
|
||||
|
||||
def list_presets() -> dict:
|
||||
"""List all available presets."""
|
||||
return PRESETS.copy()
|
||||
|
||||
|
||||
# CLI interface
|
||||
if __name__ == "__main__":
|
||||
import argparse
|
||||
import json
|
||||
|
||||
parser = argparse.ArgumentParser(description="TurboQuant Auto-Select")
|
||||
parser.add_argument("--model-size", type=float, required=True, help="Model size in GB")
|
||||
parser.add_argument("--preset", help="Config override preset")
|
||||
parser.add_argument("--prefer-quality", action="store_true", default=True, help="Prefer quality")
|
||||
parser.add_argument("--require-vllm", action="store_true", help="Require vLLM compatibility")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
parser.add_argument("--list", action="store_true", help="List all presets")
|
||||
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list:
|
||||
print("Available presets:")
|
||||
for name, info in PRESETS.items():
|
||||
vllm = "✓" if info["vllm_compatible"] else "✗"
|
||||
print(f" {name:20} {info['quality']:8} {info['compression_ratio']}x vLLM:{vllm} {info['description']}")
|
||||
else:
|
||||
result = auto_select(
|
||||
model_size_gb=args.model_size,
|
||||
config_override=args.preset,
|
||||
prefer_quality=args.prefer_quality,
|
||||
require_vllm=args.require_vllm,
|
||||
)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result.to_dict(), indent=2))
|
||||
else:
|
||||
print(f"Selected: {result.preset}")
|
||||
print(f"Reason: {result.reason}")
|
||||
print(f"Quality: {result.quality}")
|
||||
print(f"Compression: {result.compression_ratio}x")
|
||||
print(f"vLLM compatible: {result.vllm_compatible}")
|
||||
Reference in New Issue
Block a user