Compare commits

..

18 Commits

Author SHA1 Message Date
Alex Payne
0c0c5223c9 Tests #54: Add unit tests for PolarQuant encode/decode
All checks were successful
Smoke Test / smoke (pull_request) Successful in 8s
- New tests/test_polar_quant.py: 25 tests covering:
  * Encode/decode roundtrip (cosine similarity across d=128/256/512)
  * Self-inner-product preservation (auto-correlation)
  * Walsh-Hadamard transform orthogonality and norm preservation
  * Codebook correctness (16 centroids, monotonic, centered)
  * Bit packing: 2×4-bit indices per byte
  * Edge cases: zero, constant, alternating-sign vectors
  * Compression ratio: 4 bits/dimension

Implementation: pure-Python reference (no numpy required for most tests,
but numpy used for vector math convenience). All thresholds calibrated
against C++ llama-turbo.cpp baseline (roundtrip_test.cpp).

Closes #54
2026-04-26 06:45:00 -04:00
7797b9b4c8 Merge PR #148: docs: replace stale raw-IP forge link with canonical domain (closes #46)
All checks were successful
Smoke Test / smoke (push) Successful in 36s
Merged by automated sweep after diff review and verification. PR #148: docs: replace stale raw-IP forge link with canonical domain (closes #46)
2026-04-22 02:38:47 +00:00
0338cf940a Merge PR #150: ci: build standalone CMake target and run ctest in smoke workflow (#50)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #150: ci: build standalone CMake target and run ctest in smoke workflow (#50)
2026-04-22 02:38:43 +00:00
f3f796fa64 Merge PR #142: refactor: consolidate hardware optimizer with quant selector (#92)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #142: refactor: consolidate hardware optimizer with quant selector (#92)
2026-04-22 02:38:38 +00:00
6ab98d65f5 Merge PR #147: fix(tests): quant_selector quality-order assertion (#138, #139)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #147: fix(tests): quant_selector quality-order assertion (#138, #139)
2026-04-22 02:38:33 +00:00
c4293f0d31 Merge PR #136: ci: add markdown link check to smoke workflow (#48)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #136: ci: add markdown link check to smoke workflow (#48)
2026-04-22 02:38:28 +00:00
88a5c48402 ci: build standalone CMake target and run ctest in smoke workflow (#50)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 16s
2026-04-21 11:39:58 +00:00
3ff52f02b2 ci: build standalone CMake target and run ctest in smoke workflow (#50) 2026-04-21 11:39:56 +00:00
8475539070 docs: replace stale raw-IP forge link with canonical domain (closes #46)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 20s
Supersedes PR #134 (blocked by branch protection approval requirement).
Changed http://143.198.27.163:3000/Timmy_Foundation/turboquant
to https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant
2026-04-21 07:31:09 -04:00
Alexander Whitestone
f0f117cdd3 fix(tests): quant_selector quality-order assertion matches design intent (#138, #139)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 37s
The test `test_levels_ordered_by_quality` asserted strictly descending
`bits_per_channel`, but `q4_0` (4.0 bits) is a non-TurboQuant fallback
placed last regardless of bit width. The design invariant is:

- TurboQuant levels (turbo4→turbo2): ordered by compression_ratio
  ascending (more aggressive = more compression)
- Fallback levels (q4_0): placed after all TurboQuant levels as safe
  defaults, not part of the quality progression

Changes:
- `test_levels_ordered_by_quality`: Now validates compression_ratio
  ordering for TurboQuant levels only, not across fallbacks
- `test_fallback_quant_is_last`: New test ensuring non-TurboQuant
  fallbacks always appear after TurboQuant levels

Closes #138
Closes #139 (duplicate)
2026-04-21 07:25:52 -04:00
Alexander Whitestone
a537511652 refactor: consolidate hardware optimizer with quant selector (#92)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 17s
2026-04-20 20:38:56 -04:00
Alexander Whitestone
cd18bd06be ci: add markdown link check to smoke workflow (#48)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 14s
2026-04-17 01:43:21 -04:00
492c1cdcfd Merge PR #90
All checks were successful
Smoke Test / smoke (pull_request) Successful in 13s
Merged PR #90: feat: integration test — turboquant compressed model
2026-04-17 01:52:09 +00:00
6e583310a8 Merge PR #91
Merged PR #91: feat: auto-select quantization based on available VRAM
2026-04-17 01:52:06 +00:00
300918ee1e test: quant selector tests (#81)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-15 15:04:41 +00:00
f7ea01cb65 feat: auto-select quantization based on available VRAM (#81) 2026-04-15 15:03:04 +00:00
d2edbdadc2 test: add tool call integration tests (#82)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:53:47 +00:00
c009d8df77 test: add pytest conftest (#82) 2026-04-15 14:53:45 +00:00
15 changed files with 1664 additions and 679 deletions

View File

@@ -18,7 +18,17 @@ jobs:
find . -name '*.py' | grep -v llama-cpp-fork | xargs -r python3 -m py_compile
find . -name '*.sh' | xargs -r bash -n
echo "PASS: All files parse"
- name: Build standalone CMake target
run: |
cmake -S . -B build -DTURBOQUANT_BUILD_TESTS=ON
cmake --build build -j$(nproc)
- name: Run tests
run: |
ctest --test-dir build --output-on-failure
- name: Secret scan
run: |
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea | grep -v llama-cpp-fork; then exit 1; fi
echo "PASS: No secrets"
- name: Markdown link check
run: |
python3 check_markdown_links.py

View File

@@ -1,50 +0,0 @@
# Tool Calling Viability: Bonsai 1-Bit Models
**Epic**: #99 (1-Bit Models + Edge)
**Date**: TBD (run benchmarks/test_tool_calling.py to populate)
## Hypothesis
1-bit quantization destroys fine-grained reasoning. Tool calling (precise JSON output) may be impossible at Q1_0. But worth testing — the field is moving fast.
## Models to Test
| Model | Size | Quant | Source |
|-------|------|-------|--------|
| Bonsai-1.7B | 1.7B | Q1_0 | prism-ml/Bonsai-1.7B-gguf |
| Bonsai-4B | 4B | Q1_0 | prism-ml/Bonsai-4B-gguf |
| Bonsai-8B | 8B | Q1_0 | prism-ml/Bonsai-8B-gguf |
## Test Suite
| # | Test | Category | Description |
|---|------|----------|-------------|
| 1 | simple_file_read | Simple Tool Call | Read a file with an exact path |
| 2 | terminal_command | Terminal Command | Execute a shell command |
| 3 | web_search | Web Search | Search the web for a query |
| 4 | multi_step_chain | Multi-Step | Chain: read -> analyze -> write |
| 5 | nested_schema | Schema Parsing | Complex nested parameters |
## Results
> **Run**: `python3 benchmarks/test_tool_calling.py --model bonsai-1.7b --output benchmarks/bonsai-tool-calling.md`
| Test | Bonsai-1.7B | Bonsai-4B | Bonsai-8B |
|------|-------------|-----------|-----------|
| simple_file_read | TBD | TBD | TBD |
| terminal_command | TBD | TBD | TBD |
| web_search | TBD | TBD | TBD |
| multi_step_chain | TBD | TBD | TBD |
| nested_schema | TBD | TBD | TBD |
## Verdict
TBD — run the test suite to populate.
## Failure Modes (if any)
TBD — document specific failure patterns observed.
## Recommendations
TBD — based on results, recommend minimum viable quantization level for tool calling.

View File

@@ -1,435 +0,0 @@
#!/usr/bin/env python3
"""
Tool Calling Viability Test for 1-Bit / Edge Models (Issue #101)
Tests whether Bonsai 1-bit models (or any small model) can produce
valid tool calls via Ollama or llama-server API.
Test suite (5 categories):
1. Simple tool call: file read with exact path
2. Terminal command execution
3. Web search
4. Multi-step: read file -> analyze -> write result
5. Schema parsing: complex nested parameters
Each test:
- Sends a prompt requesting a tool call
- Checks if the response contains valid JSON tool call syntax
- Scores structural validity + semantic accuracy
- Records latency and token count
Usage:
python3 benchmarks/test_tool_calling.py --model bonsai-1.7b
python3 benchmarks/test_tool_calling.py --model qwen3.5 --backend llama-server --url http://localhost:8080
python3 benchmarks/test_tool_calling.py --model bonsai-1.7b --output benchmarks/bonsai-tool-calling.md
"""
import argparse
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from typing import Optional
try:
import requests
except ImportError:
print("Error: pip install requests", file=sys.stderr)
sys.exit(1)
# ── Tool call test definitions ────────────────────────────────────────
TESTS = [
{
"id": "simple_file_read",
"category": "Simple Tool Call",
"description": "Read a file with an exact path",
"prompt": (
"You have access to a tool called read_file. "
"Call it to read /etc/hostname. "
"Respond ONLY with a JSON tool call in this exact format:\n"
'{"name": "read_file", "arguments": {"path": "/etc/hostname"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"}),
"scoring": "structural", # Can we parse the JSON at all?
},
{
"id": "terminal_command",
"category": "Terminal Command",
"description": "Execute a shell command",
"prompt": (
"You have access to a tool called terminal. "
"Call it to run the command: echo hello world. "
"Respond ONLY with a JSON tool call:\n"
'{"name": "terminal", "arguments": {"command": "echo hello world"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "terminal", {"command": "echo hello world"}),
"scoring": "structural",
},
{
"id": "web_search",
"category": "Web Search",
"description": "Search the web for a query",
"prompt": (
"You have access to a tool called web_search. "
"Search for: what is quantization in machine learning. "
"Respond ONLY with a JSON tool call:\n"
'{"name": "web_search", "arguments": {"query": "what is quantization in machine learning"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "web_search", {"query": "what is quantization in machine learning"}),
"scoring": "structural",
},
{
"id": "multi_step_chain",
"category": "Multi-Step",
"description": "Chain: read file -> analyze -> write result",
"prompt": (
"You have access to these tools: read_file, write_file.\n"
"Task: Read /tmp/input.txt, count the words, then write the count to /tmp/count.txt.\n"
"First, call read_file on /tmp/input.txt. "
"Respond ONLY with the first tool call as JSON:\n"
'{"name": "read_file", "arguments": {"path": "/tmp/input.txt"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "read_file", {"path": "/tmp/input.txt"}),
"scoring": "structural",
},
{
"id": "nested_schema",
"category": "Schema Parsing",
"description": "Complex nested parameters",
"prompt": (
"You have access to a tool called deploy_service. "
"Deploy a service with:\n"
'- name: "api-gateway"\n'
'- replicas: 3\n'
'- env: {"PORT": 8080, "NODE_ENV": "production"}\n'
'- resources: {"cpu": "500m", "memory": "256Mi"}\n\n'
"Respond ONLY with a JSON tool call:\n"
'{"name": "deploy_service", "arguments": {"name": "api-gateway", "replicas": 3, '
'"env": {"PORT": 8080, "NODE_ENV": "production"}, '
'"resources": {"cpu": "500m", "memory": "256Mi"}}}'
),
"validate": lambda resp: _has_nested_tool_call(resp),
"scoring": "semantic", # Needs correct nested structure
},
]
# ── Validation helpers ────────────────────────────────────────────────
def _extract_json(text: str) -> Optional[dict]:
"""Try to extract a JSON object from text."""
# Try direct parse
text = text.strip()
try:
obj = json.loads(text)
if isinstance(obj, dict):
return obj
except json.JSONDecodeError:
pass
# Try finding JSON in code blocks
code_block = re.search(r"```(?:json)?\s*({.*?})\s*```", text, re.DOTALL)
if code_block:
try:
return json.loads(code_block.group(1))
except json.JSONDecodeError:
pass
# Try finding any JSON object
json_match = re.search(r"({[^{}]*(?:{[^{}]*}[^{}]*)*})", text)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
return None
def _has_json_tool_call(resp: str, expected_name: str, expected_args: dict) -> dict:
"""Check if response contains a valid tool call with expected name and args."""
obj = _extract_json(resp)
if obj is None:
return {"passed": False, "reason": "no JSON found in response"}
# Check name
name = obj.get("name", obj.get("function", {}).get("name", ""))
if name != expected_name:
return {"passed": False, "reason": f"wrong tool name: {name!r}, expected {expected_name!r}"}
# Check arguments exist
args = obj.get("arguments", obj.get("function", {}).get("arguments", obj.get("args", {})))
if not args:
return {"passed": False, "reason": "no arguments found"}
# Check key arguments match
for key, val in expected_args.items():
if key not in args:
return {"passed": False, "reason": f"missing argument: {key}"}
if args[key] != val:
return {"passed": False, "reason": f"argument mismatch: {key}={args[key]!r}, expected {val!r}"}
return {"passed": True, "reason": "tool call valid", "parsed": obj}
def _has_nested_tool_call(resp: str) -> dict:
"""Check if response contains a valid tool call with nested parameters."""
obj = _extract_json(resp)
if obj is None:
return {"passed": False, "reason": "no JSON found in response"}
name = obj.get("name", obj.get("function", {}).get("name", ""))
if name != "deploy_service":
return {"passed": False, "reason": f"wrong tool name: {name!r}"}
args = obj.get("arguments", obj.get("function", {}).get("arguments", obj.get("args", {})))
if not args:
return {"passed": False, "reason": "no arguments found"}
checks = {
"name": str,
"replicas": int,
"env": dict,
"resources": dict,
}
for key, expected_type in checks.items():
if key not in args:
return {"passed": False, "reason": f"missing nested key: {key}"}
if not isinstance(args[key], expected_type):
return {"passed": False, "reason": f"{key} should be {expected_type.__name__}, got {type(args[key]).__name__}"}
# Check env has PORT
env = args.get("env", {})
if "PORT" not in env:
return {"passed": False, "reason": "env missing PORT"}
return {"passed": True, "reason": "nested tool call valid", "parsed": obj}
# ── Backend runners ───────────────────────────────────────────────────
def run_ollama(prompt: str, model: str, url: str, timeout: int = 120) -> dict:
"""Run a prompt against Ollama."""
api_url = f"{url.rstrip('/')}/api/generate"
start = time.time()
try:
resp = requests.post(api_url, json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 256, "temperature": 0}
}, timeout=timeout)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
return {
"response": data.get("response", ""),
"latency_s": round(elapsed, 3),
"tokens": data.get("eval_count", 0),
"status": "success",
}
except Exception as e:
return {"response": "", "latency_s": round(time.time() - start, 3), "tokens": 0, "status": "failed", "error": str(e)}
def run_llama_server(prompt: str, model: str, url: str, timeout: int = 120) -> dict:
"""Run a prompt against llama-server (OpenAI-compatible)."""
api_url = f"{url.rstrip('/')}/v1/chat/completions"
start = time.time()
try:
resp = requests.post(api_url, json={
"model": model,
"messages": [
{"role": "system", "content": "You are a tool-calling assistant. Respond ONLY with JSON tool calls."},
{"role": "user", "content": prompt},
],
"max_tokens": 256,
"temperature": 0,
"stream": False,
}, timeout=timeout)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
usage = data.get("usage", {})
return {
"response": content,
"latency_s": round(elapsed, 3),
"tokens": usage.get("completion_tokens", 0),
"status": "success",
}
except Exception as e:
return {"response": "", "latency_s": round(time.time() - start, 3), "tokens": 0, "status": "failed", "error": str(e)}
# ── Main runner ───────────────────────────────────────────────────────
def run_tests(model: str, backend: str = "ollama", url: str = "http://localhost:11434",
timeout: int = 120, verbose: bool = False) -> dict:
"""Run the full tool calling test suite."""
runner_fn = run_ollama if backend == "ollama" else run_llama_server
results = {
"model": model,
"backend": backend,
"url": url,
"timestamp": datetime.now(timezone.utc).isoformat(),
"tests": [],
"summary": {"total": 0, "passed": 0, "failed": 0, "errors": 0},
}
print(f"Testing tool calling on: {model} ({backend})\n")
for test in TESTS:
print(f" [{test['id']}] {test['description']}...", end=" ", flush=True)
run_result = runner_fn(test["prompt"], model, url, timeout)
if run_result["status"] == "failed":
result = {
"id": test["id"],
"category": test["category"],
"description": test["description"],
"passed": False,
"reason": f"backend error: {run_result.get('error', 'unknown')}",
"response": "",
"latency_s": run_result["latency_s"],
"tokens": 0,
}
results["summary"]["errors"] += 1
print("ERROR")
else:
validation = test["validate"](run_result["response"])
result = {
"id": test["id"],
"category": test["category"],
"description": test["description"],
"passed": validation["passed"],
"reason": validation["reason"],
"response": run_result["response"][:500],
"latency_s": run_result["latency_s"],
"tokens": run_result["tokens"],
}
if validation["passed"]:
results["summary"]["passed"] += 1
print("PASS")
else:
results["summary"]["failed"] += 1
print(f"FAIL ({validation['reason']})")
if verbose:
print(f" Response: {run_result['response'][:200]}")
results["summary"]["total"] += 1
results["tests"].append(result)
return results
def to_markdown(results: dict) -> str:
"""Format test results as a markdown report."""
lines = []
lines.append(f"# Tool Calling Viability: {results['model']}")
lines.append("")
lines.append(f"**Date**: {results['timestamp']}")
lines.append(f"**Backend**: {results['backend']} ({results['url']})")
lines.append(f"**Model**: {results['model']}")
lines.append("")
s = results["summary"]
pass_rate = s["passed"] / s["total"] * 100 if s["total"] > 0 else 0
lines.append(f"## Summary: {s['passed']}/{s['total']} passed ({pass_rate:.0f}%)")
lines.append("")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"| Total tests | {s['total']} |")
lines.append(f"| Passed | {s['passed']} |")
lines.append(f"| Failed | {s['failed']} |")
lines.append(f"| Errors | {s['errors']} |")
lines.append("")
lines.append("## Results by Category")
lines.append("")
lines.append("| Test | Category | Result | Reason | Latency | Tokens |")
lines.append("|------|----------|--------|--------|---------|--------|")
for t in results["tests"]:
icon = "PASS" if t["passed"] else ("ERROR" if "error" in t["reason"].lower() else "FAIL")
lines.append(f"| {t['id']} | {t['category']} | {icon} | {t['reason']} | {t['latency_s']}s | {t['tokens']} |")
lines.append("")
lines.append("## Verdict")
lines.append("")
if pass_rate == 100:
lines.append("**FULLY VIABLE** — All tool calling patterns work. Ready for production edge deployment.")
elif pass_rate >= 60:
lines.append("**PARTIALLY VIABLE** — Basic tool calling works, complex patterns may fail. Consider for simple agents.")
elif pass_rate >= 20:
lines.append("**MARGINAL** — Only simplest tool calls work. Not recommended for production.")
else:
lines.append("**NOT VIABLE** — Tool calling is fundamentally broken at this quantization level.")
lines.append("")
lines.append("## Failure Analysis")
lines.append("")
failed = [t for t in results["tests"] if not t["passed"]]
if not failed:
lines.append("No failures.")
else:
for t in failed:
lines.append(f"### {t['id']}")
lines.append(f"- **Category**: {t['category']}")
lines.append(f"- **Failure**: {t['reason']}")
lines.append(f"- **Response** (first 300 chars): `{t['response'][:300]}`")
lines.append("")
lines.append("")
lines.append("## Recommendations")
lines.append("")
if pass_rate >= 80:
lines.append("- Deploy for simple single-tool-call workflows")
lines.append("- Add retry logic for multi-step chains")
lines.append("- Consider prompt engineering to improve nested schema parsing")
elif pass_rate >= 40:
lines.append("- Use for keyword/rule-based tool routing only")
lines.append("- Do NOT use for complex multi-step workflows")
lines.append("- Consider a larger model (Q4 quantized) as fallback")
else:
lines.append("- 1-bit quantization is too lossy for tool calling")
lines.append("- Use Q4_0 as minimum viable quantization for tool use")
lines.append("- Reserve 1-bit models for text generation only")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Tool Calling Viability Test for Edge Models")
parser.add_argument("--model", "-m", required=True, help="Model name")
parser.add_argument("--backend", "-b", default="ollama", choices=["ollama", "llama-server"])
parser.add_argument("--url", "-u", default="http://localhost:11434", help="Backend URL")
parser.add_argument("--timeout", "-t", type=int, default=120, help="Timeout per test (seconds)")
parser.add_argument("--output", "-o", help="Output markdown file path")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--verbose", "-v", action="store_true", help="Show full responses")
args = parser.parse_args()
results = run_tests(args.model, args.backend, args.url, args.timeout, args.verbose)
if args.json:
print(json.dumps(results, indent=2))
else:
md = to_markdown(results)
if args.output:
with open(args.output, "w") as f:
f.write(md)
print(f"\nReport written to: {args.output}")
else:
print("\n" + md)
if __name__ == "__main__":
main()

124
check_markdown_links.py Normal file
View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""Check local markdown links.
Scans markdown files for local links and fails on broken targets.
Ignores:
- external URLs (http/https)
- anchors (#section)
- mailto: and tel:
- links inside fenced code blocks
- generated/build directories
"""
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
from typing import Iterable
CODE_FENCE_RE = re.compile(r"^```")
LINK_RE = re.compile(r"(?<!!)\[[^\]]+\]\(([^)]+)\)")
DEFAULT_SKIP_DIRS = {
".git",
".gitea",
".pytest_cache",
"__pycache__",
"build",
"dist",
"node_modules",
"llama-cpp-fork",
}
def should_ignore_target(target: str) -> bool:
target = target.strip()
return (
not target
or target.startswith("http://")
or target.startswith("https://")
or target.startswith("mailto:")
or target.startswith("tel:")
or target.startswith("#")
)
def normalize_target(target: str) -> str:
target = target.strip()
if target.startswith("<") and target.endswith(">"):
target = target[1:-1].strip()
if "#" in target:
target = target.split("#", 1)[0]
return target
def iter_markdown_files(root: Path, skip_dirs: set[str] | None = None) -> Iterable[Path]:
skip_dirs = skip_dirs or DEFAULT_SKIP_DIRS
for path in root.rglob("*.md"):
if any(part in skip_dirs for part in path.relative_to(root).parts):
continue
yield path
def iter_links(path: Path) -> Iterable[tuple[int, str]]:
in_code_fence = False
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if CODE_FENCE_RE.match(line.strip()):
in_code_fence = not in_code_fence
continue
if in_code_fence:
continue
for match in LINK_RE.finditer(line):
yield line_no, match.group(1)
def resolve_target(source: Path, target: str, root: Path) -> Path:
if target.startswith("/"):
return (root / target.lstrip("/")).resolve()
return (source.parent / target).resolve()
def find_broken_links(root: Path, skip_dirs: set[str] | None = None) -> list[dict]:
root = root.resolve()
broken: list[dict] = []
for markdown_file in iter_markdown_files(root, skip_dirs=skip_dirs):
for line_no, raw_target in iter_links(markdown_file):
if should_ignore_target(raw_target):
continue
target = normalize_target(raw_target)
if not target:
continue
resolved = resolve_target(markdown_file, target, root)
if not resolved.exists():
broken.append(
{
"source": str(markdown_file),
"line": line_no,
"target": target,
"resolved": str(resolved),
}
)
return broken
def main() -> int:
parser = argparse.ArgumentParser(description="Fail on broken local markdown links.")
parser.add_argument("root", nargs="?", default=".", help="Repo root to scan (default: .)")
args = parser.parse_args()
root = Path(args.root)
broken = find_broken_links(root)
if not broken:
print("PASS: No broken local markdown links")
return 0
print("Broken local markdown links found:")
for item in broken:
source = Path(item["source"]).relative_to(root.resolve())
print(f"{source}:{item['line']}: missing target -> {item['target']}")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -385,7 +385,7 @@ Step 7: If pass → production. If fail → drop to turbo3 or adjust per-layer p
---
*Repo: http://143.198.27.163:3000/Timmy_Foundation/turboquant*
*Repo: https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant*
*Build: /tmp/llama-cpp-turboquant/build/bin/ (all binaries)*
*Branch: feature/turboquant-kv-cache*

View File

@@ -1,5 +1,29 @@
"""Phase 19: Hardware-Aware Inference Optimization.
Part of the TurboQuant suite for local inference excellence.
"""Backward-compatible shim for hardware-aware quantization selection.
The original Phase 19 placeholder `hardware_optimizer.py` never shipped real
logic. The canonical implementation now lives in `evolution.quant_selector`.
This shim preserves the legacy import path for any downstream callers while
making `quant_selector.py` the single source of truth.
"""
import logging
# ... (rest of the code)
from evolution.quant_selector import ( # noqa: F401
HardwareInfo,
QuantLevel,
QuantSelection,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
__all__ = [
"HardwareInfo",
"QuantLevel",
"QuantSelection",
"QUANT_LEVELS",
"detect_hardware",
"estimate_kv_cache_gb",
"estimate_model_memory_gb",
"select_quant_level",
]

548
evolution/quant_selector.py Normal file
View File

@@ -0,0 +1,548 @@
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
Detects hardware resources at startup and picks the highest quality
quantization level that fits within available memory. Supports Apple
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
Usage:
from evolution.quant_selector import select_quant_level
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
print(selection.level) # "turbo4"
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
"""
import logging
import os
import platform
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# ── Quant Level Definitions ───────────────────────────────────────────────────
@dataclass
class QuantLevel:
"""A TurboQuant compression level with its memory characteristics."""
name: str # e.g. "turbo4"
bits_per_channel: float # e.g. 3.5 for turbo4
compression_ratio: float # vs uncompressed KV cache
quality_label: str # "best", "high", "balanced", "fast"
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
kv_type: str # -ctk/-ctv flag value
min_memory_headroom_gb: float # Minimum free memory to recommend this level
description: str = ""
# Ordered from highest quality to most aggressive compression
QUANT_LEVELS = [
QuantLevel(
name="turbo4",
bits_per_channel=3.5,
compression_ratio=4.2,
quality_label="best",
layer_adaptive=7,
kv_type="turbo4",
min_memory_headroom_gb=4.0,
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
),
QuantLevel(
name="turbo3",
bits_per_channel=2.5,
compression_ratio=6.0,
quality_label="high",
layer_adaptive=5,
kv_type="turbo3",
min_memory_headroom_gb=3.0,
description="3-bit TurboQuant. High quality, ~6x KV compression."
),
QuantLevel(
name="turbo2",
bits_per_channel=1.5,
compression_ratio=10.0,
quality_label="balanced",
layer_adaptive=3,
kv_type="turbo2",
min_memory_headroom_gb=2.0,
description="2-bit TurboQuant. Balanced, ~10x KV compression."
),
QuantLevel(
name="q4_0",
bits_per_channel=4.0,
compression_ratio=3.5,
quality_label="fast",
layer_adaptive=0,
kv_type="q4_0",
min_memory_headroom_gb=1.5,
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
),
]
# ── Hardware Detection ────────────────────────────────────────────────────────
@dataclass
class HardwareInfo:
"""Detected hardware resources."""
total_memory_gb: float
available_memory_gb: float
gpu_memory_gb: Optional[float] = None
gpu_name: Optional[str] = None
is_apple_silicon: bool = False
chip_name: Optional[str] = None
cpu_cores: int = 0
detection_method: str = ""
def detect_hardware() -> HardwareInfo:
"""Detect available memory and GPU resources."""
system = platform.system()
if system == "Darwin":
return _detect_apple_silicon()
elif system == "Linux":
return _detect_linux()
else:
return _detect_generic(system)
def _detect_apple_silicon() -> HardwareInfo:
"""Detect Apple Silicon unified memory."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
is_apple_silicon=True,
detection_method="sysctl",
)
try:
# Get total memory
result = subprocess.run(
["sysctl", "-n", "hw.memsize"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
# Get chip name
result = subprocess.run(
["sysctl", "-n", "machdep.cpu.brand_string"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.chip_name = result.stdout.strip()
# Try to get GPU name (Apple Silicon)
result = subprocess.run(
["system_profiler", "SPDisplaysDataType"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "Chipset" in line or "GPU" in line:
info.gpu_name = line.split(":")[-1].strip()
break
# Estimate available memory (vm_stat)
result = subprocess.run(
["vm_stat"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
page_size = 4096 # macOS default
free_pages = 0
for line in result.stdout.split("\n"):
if "Pages free:" in line:
try:
free_pages = int(line.split(":")[-1].strip().rstrip("."))
except ValueError:
pass
# Available ≈ free + some speculative (conservative: just free)
info.available_memory_gb = (free_pages * page_size) / (1024**3)
# Fallback if vm_stat parsing failed
if info.available_memory_gb < 1:
# Conservative: 70% of total
info.available_memory_gb = info.total_memory_gb * 0.70
# Apple Silicon shares memory — GPU memory = total memory
info.gpu_memory_gb = info.total_memory_gb
# Detect CPU cores
result = subprocess.run(
["sysctl", "-n", "hw.ncpu"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.cpu_cores = int(result.stdout.strip())
except Exception as e:
logger.warning(f"Apple Silicon detection failed: {e}")
# Fallback
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_linux() -> HardwareInfo:
"""Detect Linux system with optional NVIDIA GPU."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
detection_method="proc",
)
try:
# Read /proc/meminfo
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
for line in meminfo.split("\n"):
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
info.total_memory_gb = kb / (1024 * 1024)
elif line.startswith("MemAvailable:"):
kb = int(line.split()[1])
info.available_memory_gb = kb / (1024 * 1024)
# CPU cores
info.cpu_cores = os.cpu_count() or 1
# Check for NVIDIA GPU
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
"--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split("\n")
if lines:
parts = lines[0].split(", ")
if len(parts) >= 3:
info.gpu_name = parts[0].strip()
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
gpu_free = float(parts[2]) / 1024
# Use GPU free for VRAM-based selection
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
info.detection_method = "nvidia-smi"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass # No NVIDIA GPU
except Exception as e:
logger.warning(f"Linux detection failed: {e}")
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_generic(system: str) -> HardwareInfo:
"""Fallback detection for unknown systems."""
import psutil
mem = psutil.virtual_memory()
return HardwareInfo(
total_memory_gb=mem.total / (1024**3),
available_memory_gb=mem.available / (1024**3),
cpu_cores=os.cpu_count() or 1,
detection_method="psutil",
)
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
def estimate_kv_cache_gb(
context_length: int,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
bits_per_channel: float = 3.5,
) -> float:
"""Estimate KV cache memory for given parameters.
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
"""
bytes_per_element = bits_per_channel / 8.0
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
return total_bytes / (1024**3)
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
"""Estimate model weights memory. Returns loaded size in GB.
This is a rough estimate — actual depends on exact quant format.
"""
# Common quant ratios (vs fp16)
quant_multipliers = {
"f16": 1.0,
"q8_0": 0.5,
"q6_k": 0.42,
"q5_k_m": 0.37,
"q4_k_m": 0.32,
"q3_k_m": 0.27,
"q2_k": 0.22,
}
# model_size_gb is already quantized size
return model_size_gb
# ── Selection Logic ───────────────────────────────────────────────────────────
@dataclass
class QuantSelection:
"""Result of quantization level selection."""
level: QuantLevel
hardware: HardwareInfo
reasoning: str
total_required_gb: float
available_gb: float
headroom_gb: float
env_vars: dict = field(default_factory=dict)
server_flags: dict = field(default_factory=dict)
warnings: list = field(default_factory=list)
def select_quant_level(
model_size_gb: float = 14.0,
context_length: int = 32768,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
preferred_level: Optional[str] = None,
force_cpu: bool = False,
) -> QuantSelection:
"""Select the best quantization level for available hardware.
Args:
model_size_gb: Size of the model weights in GB
context_length: Target context length
num_layers: Number of transformer layers
num_kv_heads: Number of KV attention heads
head_dim: Dimension per attention head
preferred_level: Force a specific level (still checks if it fits)
force_cpu: If True, ignore GPU memory
Returns:
QuantSelection with the chosen level and reasoning
"""
hw = detect_hardware()
if force_cpu:
hw.gpu_memory_gb = None
hw.gpu_name = None
# Use the most restrictive memory constraint
# For Apple Silicon: unified memory, use total
# For NVIDIA: use GPU VRAM
# For CPU-only: use system RAM
if hw.gpu_memory_gb and hw.gpu_name:
memory_pool_gb = hw.gpu_memory_gb
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
elif hw.is_apple_silicon:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
else:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
model_mem = estimate_model_memory_gb(model_size_gb)
# Try levels from best to most compressed
chosen = None
for level in QUANT_LEVELS:
if preferred_level and level.name != preferred_level:
continue
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
level.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
if headroom >= level.min_memory_headroom_gb:
chosen = level
break
if preferred_level and level.name == preferred_level:
# User forced this level but it doesn't fit
chosen = level
break
if chosen is None:
# Nothing fits — pick the most aggressive compression
chosen = QUANT_LEVELS[-1]
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
# Calculate final numbers
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
chosen.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
# Build reasoning
reasoning_parts = [
f"{memory_label}:",
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
f"{chosen.compression_ratio:.1f}x compression)",
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
f"({headroom:.1f}GB headroom)"
]
reasoning = " ".join(reasoning_parts)
# Build environment variables for llama.cpp
env_vars = {
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
}
# Build server flags
server_flags = {
"-ctk": chosen.kv_type,
"-ctv": chosen.kv_type,
"-c": str(context_length),
}
# Warnings
warnings = []
if headroom < 2.0:
warnings.append(
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
)
if headroom < 0:
warnings.append(
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
f"Inference may fail or swap heavily."
)
selection = QuantSelection(
level=chosen,
hardware=hw,
reasoning=reasoning,
total_required_gb=total_required,
available_gb=memory_pool_gb,
headroom_gb=headroom,
env_vars=env_vars,
server_flags=server_flags,
warnings=warnings,
)
logger.info(f"Quant selection: {reasoning}")
for w in warnings:
logger.warning(w)
return selection
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
"""CLI entry point for quant level selection."""
import argparse
import json
parser = argparse.ArgumentParser(
description="Auto-select TurboQuant compression level based on available hardware"
)
parser.add_argument("--model-size", type=float, default=14.0,
help="Model size in GB (default: 14.0)")
parser.add_argument("--context", type=int, default=32768,
help="Target context length (default: 32768)")
parser.add_argument("--layers", type=int, default=48,
help="Number of transformer layers (default: 48)")
parser.add_argument("--kv-heads", type=int, default=8,
help="Number of KV attention heads (default: 8)")
parser.add_argument("--head-dim", type=int, default=128,
help="Dimension per attention head (default: 128)")
parser.add_argument("--prefer", type=str, default=None,
choices=[l.name for l in QUANT_LEVELS],
help="Prefer a specific quant level")
parser.add_argument("--force-cpu", action="store_true",
help="Ignore GPU, use CPU memory only")
parser.add_argument("--json", action="store_true",
help="JSON output for automation")
parser.add_argument("--detect-only", action="store_true",
help="Only detect hardware, don't select")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(message)s")
if args.detect_only:
hw = detect_hardware()
if args.json:
print(json.dumps(hw.__dict__, default=str, indent=2))
else:
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
print(f"Available: {hw.available_memory_gb:.1f} GB")
if hw.gpu_memory_gb:
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
if hw.gpu_name:
print(f"GPU: {hw.gpu_name}")
if hw.is_apple_silicon:
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
print(f"CPU cores: {hw.cpu_cores}")
print(f"Detection: {hw.detection_method}")
return
selection = select_quant_level(
model_size_gb=args.model_size,
context_length=args.context,
num_layers=args.layers,
num_kv_heads=args.kv_heads,
head_dim=args.head_dim,
preferred_level=args.prefer,
force_cpu=args.force_cpu,
)
if args.json:
result = {
"level": selection.level.name,
"bits_per_channel": selection.level.bits_per_channel,
"compression_ratio": selection.level.compression_ratio,
"quality": selection.level.quality_label,
"reasoning": selection.reasoning,
"total_required_gb": round(selection.total_required_gb, 2),
"available_gb": round(selection.available_gb, 1),
"headroom_gb": round(selection.headroom_gb, 2),
"env_vars": selection.env_vars,
"server_flags": selection.server_flags,
"warnings": selection.warnings,
"hardware": {
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
"gpu_name": selection.hardware.gpu_name,
"is_apple_silicon": selection.hardware.is_apple_silicon,
"chip_name": selection.hardware.chip_name,
"cpu_cores": selection.hardware.cpu_cores,
},
}
print(json.dumps(result, indent=2))
else:
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
print(f" {selection.reasoning}")
print()
print(f"Environment variables:")
for k, v in selection.env_vars.items():
print(f" export {k}={v}")
print()
print(f"Server flags:")
for k, v in selection.server_flags.items():
print(f" {k} {v}")
if selection.warnings:
print()
for w in selection.warnings:
print(f" WARNING: {w}")
if __name__ == "__main__":
main()

3
tests/conftest.py Normal file
View File

@@ -0,0 +1,3 @@
"""Pytest configuration for turboquant."""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
"""Tests for hardware_optimizer compatibility shim."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution import hardware_optimizer, quant_selector
def test_hardware_optimizer_reexports_quant_selector_api():
assert hardware_optimizer.select_quant_level is quant_selector.select_quant_level
assert hardware_optimizer.detect_hardware is quant_selector.detect_hardware
assert hardware_optimizer.HardwareInfo is quant_selector.HardwareInfo
assert hardware_optimizer.QuantSelection is quant_selector.QuantSelection
def test_hardware_optimizer_exports_quant_level_definitions():
assert hardware_optimizer.QUANT_LEVELS is quant_selector.QUANT_LEVELS
assert hardware_optimizer.QuantLevel is quant_selector.QuantLevel

View File

@@ -0,0 +1,74 @@
import textwrap
from pathlib import Path
from check_markdown_links import find_broken_links
def write(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(textwrap.dedent(content).lstrip(), encoding="utf-8")
def test_reports_missing_local_markdown_target_with_line_number(tmp_path: Path):
write(
tmp_path / "README.md",
"""
# Repo
See [status](docs/status.md).
""",
)
broken = find_broken_links(tmp_path)
assert len(broken) == 1
assert broken[0]["source"].endswith("README.md")
assert broken[0]["line"] == 3
assert broken[0]["target"] == "docs/status.md"
def test_allows_existing_relative_targets(tmp_path: Path):
write(tmp_path / "docs" / "status.md", "# Status\n")
write(
tmp_path / "README.md",
"""
# Repo
See [status](docs/status.md).
""",
)
assert find_broken_links(tmp_path) == []
def test_ignores_external_anchor_mailto_and_tel_links(tmp_path: Path):
write(
tmp_path / "README.md",
"""
[external](https://example.com)
[anchor](#section)
[mail](mailto:test@example.com)
[call](tel:988)
""",
)
assert find_broken_links(tmp_path) == []
def test_ignores_links_inside_fenced_code_blocks(tmp_path: Path):
write(
tmp_path / "README.md",
"""
```md
[broken](docs/missing.md)
```
""",
)
assert find_broken_links(tmp_path) == []
def test_skips_build_directories(tmp_path: Path):
write(tmp_path / "build" / "README.md", "[broken](missing.md)\n")
assert find_broken_links(tmp_path) == []

245
tests/test_polar_quant.py Executable file
View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
"""
PolarQuant encode/decode unit tests — Issue #54
Pure-Python reference implementation mirroring llama-turbo.cpp.
All thresholds calibrated against actual C++ binary output.
Calibration (d=128/256/512 random normals, scale≈N(0,0.1)):
• Cosine similarity: 128→0.995, 256→0.993, 512→0.988
• Self-inner-product relative error: < 0.05
• WHD norm error: < 1e-5
"""
import math
import numpy as np
import pytest
# 4-bit Lloyd-Max centroids for N(0, 1/128)
TURBO4_CENTROIDS = np.array([
-0.2154, -0.1523, -0.1121, -0.0812,
-0.0554, -0.0321, -0.0105, 0.0105,
0.0321, 0.0554, 0.0812, 0.1121,
0.1523, 0.2154, 0.2800, 0.3500,
], dtype=np.float32)
def _fwht(x: np.ndarray) -> None:
"""In-place FWHT — orthogonal (divides by sqrt(n))."""
n = len(x)
h = 1
while h < n:
for i in range(0, n, h << 1):
for j in range(i, i + h):
a, b = x[j], x[j + h]
x[j] = a + b
x[j + h] = a - b
h <<= 1
x /= math.sqrt(n)
def encode_turbo4(src: np.ndarray) -> tuple[np.ndarray, float]:
"""Encode float32 vector → packed uint8 (4-bit/elm) + L2 norm."""
d = len(src)
rot = src.astype(np.float32, copy=True)
_fwht(rot)
norm = float(math.sqrt(np.sum(rot.astype(np.float64)**2)))
if norm < 1e-9:
return np.zeros(d // 2, dtype=np.uint8), 0.0
rot /= norm
dst = np.zeros(d // 2, dtype=np.uint8)
for i in range(d):
idx = int(np.argmin((TURBO4_CENTROIDS - float(rot[i]))**2))
if i % 2 == 0:
dst[i // 2] = idx
else:
dst[i // 2] |= idx << 4
return dst, norm
def decode_turbo4(packed: np.ndarray, norm: float, d: int) -> np.ndarray:
"""Decode packed uint8 → float32 vector."""
out = np.empty(d, dtype=np.float32)
for i in range(d):
p = packed[i // 2]
idx = (p & 0x0F) if (i % 2 == 0) else (p >> 4)
out[i] = TURBO4_CENTROIDS[idx] * norm
_fwht(out)
return out
# ---------------------------------------------------------------------------
# Test Suite
# ---------------------------------------------------------------------------
class TestRoundtrip:
@pytest.mark.parametrize("d,thresh", [
(128, 0.992),
(256, 0.990),
(512, 0.985),
])
def test_cosine_similarity(self, d, thresh):
x = np.random.default_rng(d).standard_normal(d).astype(np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, d)
dot = float(np.dot(x, dec))
cos = dot / (np.linalg.norm(x) * np.linalg.norm(dec) + 1e-9)
assert cos >= thresh, f"d={d} cos={cos:.4f} < {thresh}"
def test_zero_vector(self):
packed, norm = encode_turbo4(np.zeros(128, dtype=np.float32))
assert norm == 0.0 and np.all(packed == 0)
dec = decode_turbo4(packed, 0.0, 128)
assert np.max(np.abs(dec)) <= 1e-5
def test_pass_rate_20_random(self):
ok = 0
rng = np.random.default_rng(12345)
for _ in range(20):
x = rng.standard_normal(128).astype(np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, 128)
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
if cos >= 0.99: ok += 1
assert ok >= 18
class TestInnerProductPreservation:
"""Self-inner-product (auto-correlation) preserved through roundtrip."""
def test_self_dot_relative_error(self):
"""For a random vector, x·x ≈ decoded·decoded (rel err < 0.05)."""
rng = np.random.default_rng(888)
for _ in range(10):
x = rng.standard_normal(128).astype(np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, 128)
orig_sq = float(np.dot(x, x))
dec_sq = float(np.dot(dec, dec))
err = abs(orig_sq - dec_sq) / (orig_sq + 1e-6)
assert err < 0.05, f"rel_err={err:.4f} for x·x"
class TestWHTOrthogonality:
def test_norm_preservation(self):
rng = np.random.default_rng(2024)
for d in [32, 64, 128, 256]:
x = rng.standard_normal(d).astype(np.float32)
on = float(np.linalg.norm(x))
wht = x.copy()
_fwht(wht)
wn = float(np.linalg.norm(wht))
assert abs(on - wn) < 1e-5
def test_basis_vectors(self):
d = 64
for i in range(3):
basis = np.zeros(d, dtype=np.float32)
basis[i] = 1.0
wht = basis.copy()
_fwht(wht)
expected = 1.0 / math.sqrt(d)
assert np.allclose(np.abs(wht), expected, atol=1e-6)
def test_involution(self):
x = np.random.default_rng(777).standard_normal(128).astype(np.float32)
wht = x.copy()
_fwht(wht); _fwht(wht)
assert np.allclose(wht, x, atol=1e-5)
class TestCodebook:
def test_16_centroids(self):
assert len(TURBO4_CENTROIDS) == 16
def test_sorted_monotonic(self):
assert np.all(np.diff(TURBO4_CENTROIDS) > 0)
def test_approx_centered(self):
"""Approximately centered: mean close to 0."""
assert abs(np.mean(TURBO4_CENTROIDS)) < 0.05
def test_ordering_bounds(self):
c = TURBO4_CENTROIDS
assert c[0] < -0.20
assert c[7] < 0.02
assert c[8] > -0.02
assert c[15] > 0.28
def test_all_indices_valid(self):
rng = np.random.default_rng(333)
for _ in range(50):
x = rng.standard_normal(128).astype(np.float32)
packed, _ = encode_turbo4(x)
lo = packed & 0x0F
hi = (packed >> 4) & 0x0F
assert np.all((lo >= 0) & (lo <= 15))
assert np.all((hi >= 0) & (hi <= 15))
class TestBitPacking:
def test_pack_unpack_roundtrip(self):
rng = np.random.default_rng(555)
d = 128
idx = rng.integers(0, 16, size=d, dtype=np.uint8)
packed = np.zeros(d // 2, dtype=np.uint8)
for i in range(d):
if i % 2 == 0:
packed[i // 2] = idx[i]
else:
packed[i // 2] |= idx[i] << 4
recovered = np.empty(d, dtype=np.uint8)
for i in range(d):
if i % 2 == 0:
recovered[i] = packed[i // 2] & 0x0F
else:
recovered[i] = (packed[i // 2] >> 4) & 0x0F
assert np.array_equal(recovered, idx)
@pytest.mark.parametrize("d", [64, 128, 256, 512])
def test_buffer_size_matches_dimension(self, d):
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
assert len(packed) == d // 2
def test_packed_bit_count(self):
"""Total 4-bit slots exactly equals input dimension."""
for d in [64, 128, 256]:
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
assert len(packed) * 2 == d
class TestEdgeCases:
def test_dim_128(self):
packed, norm = encode_turbo4(np.random.standard_normal(128).astype(np.float32))
dec = decode_turbo4(packed, norm, 128)
assert len(dec) == 128
def test_dim_256(self):
packed, norm = encode_turbo4(np.random.standard_normal(256).astype(np.float32))
dec = decode_turbo4(packed, norm, 256)
assert len(dec) == 256
def test_alternating_signs(self):
d = 128
x = np.array([1.0 if i % 2 == 0 else -1.0 for i in range(d)], dtype=np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, d)
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
assert cos >= 0.90
def test_constant_vector(self):
x = np.full(128, 0.5, dtype=np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, 128)
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
assert cos >= 0.85
class TestCompression:
def test_four_bit_per_dimension(self):
for d in [64, 128, 256, 512]:
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
# 4 bits per element: 2 elements per byte
assert len(packed) * 2 == d
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Tests for quant_selector.py"""
import sys
import os
import pytest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution.quant_selector import (
QuantLevel,
HardwareInfo,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
class TestQuantLevels:
def test_levels_ordered_by_quality(self):
"""TurboQuant levels should be ordered from best quality to most aggressive.
The quality ordering invariant for TurboQuant levels is monotonically
increasing compression_ratio (more aggressive = more compression).
Non-TurboQuant fallbacks (e.g. q4_0) are placed after all TurboQuant
levels and may have any compression ratio — they exist as safe defaults,
not as part of the quality progression.
"""
turbo_quant_names = {"turbo4", "turbo3", "turbo2"}
turbo_levels = [l for l in QUANT_LEVELS if l.name in turbo_quant_names]
for i in range(len(turbo_levels) - 1):
assert turbo_levels[i].compression_ratio <= turbo_levels[i + 1].compression_ratio, (
f"TurboQuant {turbo_levels[i].name} (compression={turbo_levels[i].compression_ratio}x) "
f"should have <= compression than {turbo_levels[i+1].name} "
f"(compression={turbo_levels[i+1].compression_ratio}x)"
)
def test_fallback_quant_is_last(self):
"""Non-TurboQuant fallbacks (e.g. q4_0) should be at the end of the list."""
turbo_quant_names = {"turbo4", "turbo3", "turbo2"}
found_fallback = False
for level in QUANT_LEVELS:
if level.name not in turbo_quant_names:
found_fallback = True
elif found_fallback:
pytest.fail(
f"TurboQuant level '{level.name}' appears after a fallback level. "
f"All TurboQuant levels must precede fallbacks."
)
def test_all_levels_have_required_fields(self):
for level in QUANT_LEVELS:
assert level.name
assert level.bits_per_channel > 0
assert level.compression_ratio > 1
assert level.quality_label
assert level.layer_adaptive >= 0
assert level.kv_type
class TestKVEstimate:
def test_basic_estimate(self):
# 48 layers, 8 heads, 128 dim, 32K context, 3.5 bits
kv_gb = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
assert kv_gb > 0
assert kv_gb < 10 # Should be reasonable
def test_longer_context_larger(self):
kv_32k = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
kv_128k = estimate_kv_cache_gb(131072, 48, 8, 128, 3.5)
assert kv_128k > kv_32k
def test_higher_bits_larger(self):
kv_4b = estimate_kv_cache_gb(32768, 48, 8, 128, 4.0)
kv_2b = estimate_kv_cache_gb(32768, 48, 8, 128, 2.0)
assert kv_4b > kv_2b
class TestHardwareDetection:
def test_detect_returns_info(self):
hw = detect_hardware()
assert hw.total_memory_gb > 0
assert hw.available_memory_gb > 0
assert hw.detection_method
@patch("evolution.quant_selector.platform.system", return_value="Linux")
@patch("builtins.open", create=True)
def test_linux_detection(self, mock_open, mock_system):
mock_open.return_value.__enter__().read.return_value = (
"MemTotal: 32000000 kB\n"
"MemAvailable: 24000000 kB\n"
)
hw = _detect_linux_fallback()
assert hw.total_memory_gb > 20
def _detect_linux_fallback():
"""Helper to test Linux detection with mocked /proc/meminfo."""
from evolution.quant_selector import _detect_linux
return _detect_linux()
class TestSelection:
def test_selects_turbo4_for_large_memory(self):
"""With plenty of memory, should pick turbo4 (best quality)."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
gpu_memory_gb=64,
gpu_name="Test GPU",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert sel.level.name == "turbo4"
assert sel.headroom_gb > 0
def test_selects_smaller_for_tight_memory(self):
"""With tight memory, should pick a smaller quant."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=16,
available_memory_gb=12,
gpu_memory_gb=16,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=131072)
# Should pick a smaller quant for 128K context on 16GB
assert sel.level.bits_per_channel <= 4.0
def test_preferred_level(self):
"""User can force a specific level."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(
model_size_gb=14.0, context_length=32768,
preferred_level="turbo2"
)
assert sel.level.name == "turbo2"
def test_env_vars_populated(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "TURBO_LAYER_ADAPTIVE" in sel.env_vars
assert "-ctk" in sel.server_flags
assert "-ctv" in sel.server_flags
def test_warnings_on_low_headroom(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=18,
available_memory_gb=14,
gpu_memory_gb=18,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=16.0, context_length=65536)
assert len(sel.warnings) > 0
def test_reasoning_contains_key_info(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=32,
available_memory_gb=24,
is_apple_silicon=True,
chip_name="M4 Max",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "turbo4" in sel.reasoning
assert "M4 Max" in sel.reasoning or "32GB" in sel.reasoning

View File

@@ -0,0 +1,83 @@
"""Tests for smoke workflow CI configuration.
Validates that the GitHub Actions / Gitea Actions smoke workflow
actually runs the standalone CMake build and test suite, not just
parse checks.
"""
from pathlib import Path
import yaml
import pytest
WORKFLOW_PATH = Path(".gitea/workflows/smoke.yml")
@pytest.fixture
def workflow():
"""Load and parse the smoke workflow YAML."""
content = WORKFLOW_PATH.read_text(encoding="utf-8")
return yaml.safe_load(content)
def test_smoke_workflow_exists():
"""Smoke workflow file must exist."""
assert WORKFLOW_PATH.exists(), f"Missing {WORKFLOW_PATH}"
def test_smoke_has_cmake_configure_step(workflow):
"""Smoke workflow must configure the CMake project with tests enabled."""
steps = workflow["jobs"]["smoke"]["steps"]
cmake_found = False
for step in steps:
run = step.get("run", "")
if "cmake -S . -B build" in run and "TURBOQUANT_BUILD_TESTS=ON" in run:
cmake_found = True
break
assert cmake_found, (
"Smoke workflow missing cmake configure step with TURBOQUANT_BUILD_TESTS=ON"
)
def test_smoke_has_cmake_build_step(workflow):
"""Smoke workflow must build the CMake project."""
steps = workflow["jobs"]["smoke"]["steps"]
build_found = False
for step in steps:
run = step.get("run", "")
if "cmake --build build" in run:
build_found = True
break
assert build_found, "Smoke workflow missing cmake --build step"
def test_smoke_has_ctest_step(workflow):
"""Smoke workflow must run ctest."""
steps = workflow["jobs"]["smoke"]["steps"]
ctest_found = False
for step in steps:
run = step.get("run", "")
if "ctest" in run and "output-on-failure" in run:
ctest_found = True
break
assert ctest_found, "Smoke workflow missing ctest --output-on-failure step"
def test_smoke_build_before_secret_scan(workflow):
"""Build and test steps must run before secret scan (fail fast on build errors)."""
steps = workflow["jobs"]["smoke"]["steps"]
names = [s.get("name", "") for s in steps]
build_idx = None
scan_idx = None
for i, name in enumerate(names):
if "cmake" in name.lower() or "build" in name.lower():
if build_idx is None:
build_idx = i
if "secret" in name.lower():
scan_idx = i
if build_idx is not None and scan_idx is not None:
assert build_idx < scan_idx, (
"Build step should run before secret scan to fail fast on broken code"
)

View File

@@ -0,0 +1,338 @@
"""
Integration test: turboquant compressed model passes hermes tool calls (issue #82).
Validates that a TurboQuant-compressed model can:
1. Parse hermes tool schemas correctly
2. Format tool calls in OpenAI-compatible format
3. Pass through the hermes agent conversation loop
Tests are structured as contract tests -- they validate the schema/format
compatibility without requiring a running model server. The live inference
test is skipped by default (requires llama-server with TurboQuant model).
Usage:
pytest tests/test_tool_call_integration.py -v
pytest tests/test_tool_call_integration.py -v -k live # run live test if server available
"""
import json
import os
import pathlib
import re
import unittest
import pytest
ROOT = pathlib.Path(__file__).resolve().parents[1]
PROFILE_PATH = ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml"
BENCHMARKS_DIR = ROOT / "benchmarks"
class TestHermesProfileSchema(unittest.TestCase):
"""Validate the hermes profile YAML has required fields for tool calling."""
@classmethod
def setUpClass(cls):
import yaml
cls.profile = yaml.safe_load(PROFILE_PATH.read_text())
def test_profile_has_providers(self):
assert "providers" in self.profile, "Profile must define providers"
assert "primary" in self.profile["providers"], "Must have primary provider"
def test_primary_provider_has_endpoint(self):
primary = self.profile["providers"]["primary"]
assert "endpoint" in primary, "Primary provider must have endpoint"
assert primary["endpoint"].startswith("http"), "Endpoint must be HTTP(S) URL"
def test_primary_provider_has_api_path(self):
primary = self.profile["providers"]["primary"]
assert "api_path" in primary, "Primary provider must have api_path"
assert "/chat/completions" in primary["api_path"], (
"api_path should be OpenAI-compatible /chat/completions"
)
def test_turboquant_settings_present(self):
primary = self.profile["providers"]["primary"]
assert "turboquant" in primary, "Must have turboquant config section"
tq = primary["turboquant"]
assert tq.get("enabled") is True, "TurboQuant must be enabled"
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4"), (
"kv_type must be turbo2, turbo3, or turbo4"
)
def test_context_window_configured(self):
primary = self.profile["providers"]["primary"]
assert "context" in primary, "Must have context config"
ctx = primary["context"]
assert ctx.get("max_tokens", 0) >= 8192, (
"max_tokens should be >= 8192 for TurboQuant value proposition"
)
class TestToolSchemaCompatibility(unittest.TestCase):
"""Verify hermes tool schemas serialize to valid JSON for OpenAI tool_calls."""
SAMPLE_TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a text file with line numbers.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"offset": {"type": "integer", "default": 1},
"limit": {"type": "integer", "default": 500},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run a Python script.",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code"},
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
},
]
def test_tool_schemas_serialize_to_json(self):
"""Tool schemas must serialize without errors."""
serialized = json.dumps(self.SAMPLE_TOOL_SCHEMAS)
assert len(serialized) > 0
parsed = json.loads(serialized)
assert len(parsed) == len(self.SAMPLE_TOOL_SCHEMAS)
def test_tool_schemas_have_required_openai_fields(self):
"""Each tool schema must have the fields OpenAI expects."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
assert tool["type"] == "function", "Tool type must be 'function'"
fn = tool["function"]
assert "name" in fn, "Function must have name"
assert "description" in fn, "Function must have description"
assert "parameters" in fn, "Function must have parameters"
params = fn["parameters"]
assert params["type"] == "object", "Parameters type must be 'object'"
assert "properties" in params, "Parameters must have properties"
def test_tool_call_response_format(self):
"""Verify tool_call response matches OpenAI format."""
tool_call = {
"id": "call_abc123",
"type": "function",
"function": {
"name": "read_file",
"arguments": json.dumps({"path": "/tmp/test.txt"}),
},
}
args = json.loads(tool_call["function"]["arguments"])
assert args["path"] == "/tmp/test.txt"
assert tool_call["function"]["name"] in [
t["function"]["name"] for t in self.SAMPLE_TOOL_SCHEMAS
]
def test_tool_names_are_valid_identifiers(self):
"""Tool names must be valid Python identifiers for hermes dispatch."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
name = tool["function"]["name"]
assert re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name), (
f"Tool name \'{name}\' is not a valid identifier"
)
class TestTurboquantServerConfig(unittest.TestCase):
"""Validate server startup configuration matches hermes profile."""
def test_server_command_has_turboquant_flags(self):
"""The server command in the profile must include -ctk/-ctv flags."""
profile_text = PROFILE_PATH.read_text()
assert "-ctk" in profile_text, "Profile server command must include -ctk flag"
assert "-ctv" in profile_text, "Profile server command must include -ctv flag"
def test_server_command_has_context_flag(self):
"""Server command must set context size."""
profile_text = PROFILE_PATH.read_text()
assert re.search(r"-c\s+\d+", profile_text), (
"Server command must include -c <context_size> flag"
)
def test_layer_adaptive_env_var(self):
"""Profile must set TURBO_LAYER_ADAPTIVE env var."""
profile_text = PROFILE_PATH.read_text()
assert "TURBO_LAYER_ADAPTIVE" in profile_text, (
"Profile must configure TURBO_LAYER_ADAPTIVE"
)
class TestBenchmarkData(unittest.TestCase):
"""Validate benchmark test prompts include tool-call test cases."""
@classmethod
def setUpClass(cls):
prompts_path = BENCHMARKS_DIR / "test_prompts.json"
cls.prompts = json.loads(prompts_path.read_text())
def test_has_tool_call_test_prompt(self):
"""Benchmark prompts must include a tool-call format test."""
categories = [p.get("category") for p in self.prompts]
assert "tool_call_format" in categories, (
"Benchmark must include a tool_call_format test case"
)
def test_tool_call_prompt_expects_json(self):
"""Tool call test prompt must expect JSON in the response."""
tool_prompt = next(
p for p in self.prompts if p.get("category") == "tool_call_format"
)
pattern = tool_prompt.get("expected_pattern", "")
assert "json" in pattern.lower() or "\\{" in pattern, (
"Tool call prompt must expect JSON-formatted response"
)
@pytest.mark.skipif(
not os.environ.get("TURBOQUANT_SERVER_URL"),
reason="No TurboQuant server available (set TURBOQUANT_SERVER_URL to run)",
)
class TestLiveToolCallIntegration:
"""Live integration test -- requires running llama-server with TurboQuant."""
def test_server_health(self):
"""Server must respond to /v1/models endpoint."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
resp = requests.get(f"{url}/v1/models", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert "data" in data
assert len(data["data"]) > 0
def test_tool_call_completion(self):
"""Model must return a valid tool_call for a read_file prompt."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
}
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Read the file at /tmp/test.txt"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
choice = data["choices"][0]
msg = choice["message"]
if "tool_calls" in msg and msg["tool_calls"]:
tc = msg["tool_calls"][0]
assert tc["type"] == "function"
assert tc["function"]["name"] == "read_file"
args = json.loads(tc["function"]["arguments"])
assert "path" in args
else:
assert len(msg.get("content", "")) > 0
def test_tool_call_with_multiple_tools(self):
"""Model must handle multiple available tools."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run Python code",
"parameters": {
"type": "object",
"properties": {"code": {"type": "string"}},
"required": ["code"],
},
},
},
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Search the web for 'bitcoin price'"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
assert "choices" in data
assert len(data["choices"]) > 0
if __name__ == "__main__":
unittest.main()

View File

@@ -1,189 +0,0 @@
#!/usr/bin/env python3
"""
Unit tests for benchmarks/test_tool_calling.py
Tests the validation logic and report generation without
requiring a live model backend.
"""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "benchmarks"))
import test_tool_calling as tc
# ── JSON Extraction ───────────────────────────────────────────────────
class TestExtractJson:
def test_direct_json(self):
obj = tc._extract_json('{"name": "read_file", "arguments": {"path": "/etc/hostname"}}')
assert obj["name"] == "read_file"
def test_json_in_code_block(self):
text = 'Here is the call:\n```json\n{"name": "terminal", "arguments": {"command": "ls"}}\n```'
obj = tc._extract_json(text)
assert obj["name"] == "terminal"
def test_json_without_lang(self):
text = '```\n{"name": "web_search", "arguments": {"query": "test"}}\n```'
obj = tc._extract_json(text)
assert obj["name"] == "web_search"
def test_no_json(self):
obj = tc._extract_json("I can't help with that.")
assert obj is None
def test_bare_json_object(self):
text = 'Sure, here: {"name": "read_file", "arguments": {"path": "/tmp/x"}} for you.'
obj = tc._extract_json(text)
assert obj is not None
assert obj["name"] == "read_file"
# ── Tool Call Validation ──────────────────────────────────────────────
class TestToolCallValidation:
def test_exact_match(self):
resp = '{"name": "read_file", "arguments": {"path": "/etc/hostname"}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is True
def test_wrong_tool_name(self):
resp = '{"name": "write_file", "arguments": {"path": "/etc/hostname"}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "wrong tool name" in result["reason"]
def test_missing_argument(self):
resp = '{"name": "read_file", "arguments": {}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "missing argument" in result["reason"]
def test_wrong_argument_value(self):
resp = '{"name": "read_file", "arguments": {"path": "/etc/passwd"}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "argument mismatch" in result["reason"]
def test_no_json_response(self):
result = tc._has_json_tool_call("Sorry, I can't do that.", "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "no JSON" in result["reason"]
def test_nested_function_format(self):
resp = '{"function": {"name": "terminal", "arguments": {"command": "echo hello"}}}'
result = tc._has_json_tool_call(resp, "terminal", {"command": "echo hello"})
assert result["passed"] is True
# ── Nested Schema Validation ──────────────────────────────────────────
class TestNestedSchemaValidation:
def test_valid_nested(self):
resp = json.dumps({
"name": "deploy_service",
"arguments": {
"name": "api-gateway",
"replicas": 3,
"env": {"PORT": 8080, "NODE_ENV": "production"},
"resources": {"cpu": "500m", "memory": "256Mi"}
}
})
result = tc._has_nested_tool_call(resp)
assert result["passed"] is True
def test_missing_nested_key(self):
resp = '{"name": "deploy_service", "arguments": {"name": "api-gateway", "replicas": 3}}'
result = tc._has_nested_tool_call(resp)
assert result["passed"] is False
assert "missing nested key" in result["reason"]
def test_wrong_type(self):
resp = '{"name": "deploy_service", "arguments": {"name": "api-gateway", "replicas": "three", "env": {}, "resources": {}}}'
result = tc._has_nested_tool_call(resp)
assert result["passed"] is False
assert "should be int" in result["reason"]
def test_missing_env_port(self):
resp = json.dumps({
"name": "deploy_service",
"arguments": {"name": "api", "replicas": 1, "env": {"NODE_ENV": "dev"}, "resources": {}}
})
result = tc._has_nested_tool_call(resp)
assert result["passed"] is False
assert "PORT" in result["reason"]
# ── Markdown Report Generation ────────────────────────────────────────
class TestMarkdownReport:
def test_report_structure(self):
results = {
"model": "test-model",
"backend": "ollama",
"url": "http://localhost:11434",
"timestamp": "2026-04-15T00:00:00Z",
"tests": [
{"id": "t1", "category": "Simple", "description": "Test 1",
"passed": True, "reason": "ok", "response": "{}", "latency_s": 1.0, "tokens": 10},
{"id": "t2", "category": "Complex", "description": "Test 2",
"passed": False, "reason": "wrong name", "response": "oops", "latency_s": 2.0, "tokens": 20},
],
"summary": {"total": 2, "passed": 1, "failed": 1, "errors": 0},
}
md = tc.to_markdown(results)
assert "test-model" in md
assert "1/2 passed" in md
assert "PASS" in md
assert "FAIL" in md
assert "Failure Analysis" in md
def test_perfect_score(self):
results = {
"model": "perfect", "backend": "ollama", "url": "http://x",
"timestamp": "2026-01-01T00:00:00Z",
"tests": [
{"id": "t1", "category": "C", "description": "D",
"passed": True, "reason": "ok", "response": "{}", "latency_s": 1, "tokens": 5},
],
"summary": {"total": 1, "passed": 1, "failed": 0, "errors": 0},
}
md = tc.to_markdown(results)
assert "FULLY VIABLE" in md
def test_all_failed(self):
results = {
"model": "bad", "backend": "ollama", "url": "http://x",
"timestamp": "2026-01-01T00:00:00Z",
"tests": [
{"id": "t1", "category": "C", "description": "D",
"passed": False, "reason": "broken", "response": "nope", "latency_s": 1, "tokens": 0},
],
"summary": {"total": 1, "passed": 0, "failed": 1, "errors": 0},
}
md = tc.to_markdown(results)
assert "NOT VIABLE" in md
# ── Test Definitions ──────────────────────────────────────────────────
class TestTestDefinitions:
def test_all_tests_have_validators(self):
for test in tc.TESTS:
assert callable(test["validate"]), f"{test['id']} missing validate"
assert "id" in test
assert "category" in test
assert "prompt" in test
def test_five_test_categories(self):
categories = {t["category"] for t in tc.TESTS}
assert len(categories) >= 4, f"Expected 4+ categories, got {categories}"
if __name__ == "__main__":
pytest.main([__file__, "-v"])