Compare commits

..

8 Commits

Author SHA1 Message Date
590c4c7820 test: add unit tests for tool calling test runner (#101)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-16 01:47:37 +00:00
629be9714f docs: add tool calling results template (#101) 2026-04-16 01:47:35 +00:00
3123d1fa8e feat: tool calling viability test suite for 1-bit models (#101) 2026-04-16 01:45:48 +00:00
3cd8750cbb Merge pull request 'feat: standalone build system and roundtrip tests - #17' (#51) from dispatch/17-1776180746 into main
All checks were successful
Smoke Test / smoke (pull_request) Successful in 15s
2026-04-15 11:57:58 +00:00
ef765bbd30 Merge pull request 'fix(docs): resolve broken markdown links and stale forge URL' (#52) from burn/fix-doc-links into main 2026-04-15 11:57:55 +00:00
Hermes Agent
5f0d00f127 fix(docs): resolve broken markdown links and stale forge URL
All checks were successful
Smoke Test / smoke (pull_request) Successful in 6s
- Update raw-IP forge URL to canonical forge domain in README.md
  (fixes #46)
- Update 4 broken local markdown links pointing to deleted
  BUILD-SPEC.md, PHASE1-REPORT.md, FULL-REPORT.md to
  docs/PROJECT_STATUS.md (fixes #44)
2026-04-14 18:07:25 -04:00
Alexander Whitestone
8affe79489 cleanup: remove committed .pyc and redundant Python test, add .gitignore
All checks were successful
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-14 11:34:38 -04:00
Alexander Whitestone
319f57780d feat: add standalone build system and roundtrip tests (Issue #17)
- CMakeLists.txt: builds turboquant as static library
- TURBOQUANT_BUILD_TESTS option enables ctest roundtrip tests
- tests/roundtrip_test.cpp: validates zero-vector roundtrip and
  gaussian cosine similarity (>=0.99)
- Makefile wrapper for convenience (build/test/clean targets)
- Addresses contributor feedback on spec-to-code gap and CI from #17
2026-04-14 11:34:38 -04:00
10 changed files with 820 additions and 442 deletions

3
.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
build/
*.pyc
__pycache__/

36
CMakeLists.txt Normal file
View File

@@ -0,0 +1,36 @@
cmake_minimum_required(VERSION 3.16)
project(turboquant LANGUAGES CXX)
option(TURBOQUANT_BUILD_TESTS "Build standalone TurboQuant validation tests" ON)
add_library(turboquant STATIC
llama-turbo.cpp
)
target_include_directories(turboquant PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
)
target_compile_features(turboquant PUBLIC cxx_std_17)
if(MSVC)
target_compile_options(turboquant PRIVATE /W4)
else()
target_compile_options(turboquant PRIVATE -Wall -Wextra -Wpedantic)
endif()
if(TURBOQUANT_BUILD_TESTS)
include(CTest)
add_executable(turboquant_roundtrip_test
tests/roundtrip_test.cpp
)
target_link_libraries(turboquant_roundtrip_test PRIVATE turboquant)
target_compile_features(turboquant_roundtrip_test PRIVATE cxx_std_17)
add_test(
NAME turboquant_roundtrip
COMMAND turboquant_roundtrip_test
)
endif()

View File

@@ -13,7 +13,7 @@ Unlock 64K-128K context on qwen3.5:27b within 32GB unified memory.
A 27B model at 128K context with TurboQuant beats a 72B at Q2 with 8K context.
## Status
See [issues](http://143.198.27.163:3000/Timmy_Foundation/turboquant/issues) for current progress.
See [issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant/issues) for current progress.
## Roles
- **Strago:** Build spec author
@@ -29,4 +29,4 @@ See [issues](http://143.198.27.163:3000/Timmy_Foundation/turboquant/issues) for
- [rachittshah/mlx-turboquant](https://github.com/rachittshah/mlx-turboquant) — MLX fallback
## Docs
- [BUILD-SPEC.md](BUILD-SPEC.md) — Full build specification (Strago, v2.2)
- [Project Status](docs/PROJECT_STATUS.md) — Full project status and build specification

View File

@@ -0,0 +1,50 @@
# Tool Calling Viability: Bonsai 1-Bit Models
**Epic**: #99 (1-Bit Models + Edge)
**Date**: TBD (run benchmarks/test_tool_calling.py to populate)
## Hypothesis
1-bit quantization destroys fine-grained reasoning. Tool calling (precise JSON output) may be impossible at Q1_0. But worth testing — the field is moving fast.
## Models to Test
| Model | Size | Quant | Source |
|-------|------|-------|--------|
| Bonsai-1.7B | 1.7B | Q1_0 | prism-ml/Bonsai-1.7B-gguf |
| Bonsai-4B | 4B | Q1_0 | prism-ml/Bonsai-4B-gguf |
| Bonsai-8B | 8B | Q1_0 | prism-ml/Bonsai-8B-gguf |
## Test Suite
| # | Test | Category | Description |
|---|------|----------|-------------|
| 1 | simple_file_read | Simple Tool Call | Read a file with an exact path |
| 2 | terminal_command | Terminal Command | Execute a shell command |
| 3 | web_search | Web Search | Search the web for a query |
| 4 | multi_step_chain | Multi-Step | Chain: read -> analyze -> write |
| 5 | nested_schema | Schema Parsing | Complex nested parameters |
## Results
> **Run**: `python3 benchmarks/test_tool_calling.py --model bonsai-1.7b --output benchmarks/bonsai-tool-calling.md`
| Test | Bonsai-1.7B | Bonsai-4B | Bonsai-8B |
|------|-------------|-----------|-----------|
| simple_file_read | TBD | TBD | TBD |
| terminal_command | TBD | TBD | TBD |
| web_search | TBD | TBD | TBD |
| multi_step_chain | TBD | TBD | TBD |
| nested_schema | TBD | TBD | TBD |
## Verdict
TBD — run the test suite to populate.
## Failure Modes (if any)
TBD — document specific failure patterns observed.
## Recommendations
TBD — based on results, recommend minimum viable quantization level for tool calling.

View File

@@ -1,319 +0,0 @@
#!/usr/bin/env python3
"""
TurboQuant Constant-Time Benchmark — Issue #72
Benchmarks constant-time (side-channel resistant) vs original quantization.
Measures encode latency, decode latency, and memory bandwidth impact.
Usage:
python3 benchmarks/constant_time_benchmark.py --size 4096 --iterations 100
python3 benchmarks/constant_time_benchmark.py --json
"""
import argparse
import json
import os
import statistics
import sys
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Callable
# ---------------------------------------------------------------------------
# Quantization kernels (Python reference implementations)
# ---------------------------------------------------------------------------
import struct
import math
def quantize_fp16_to_q4_0_original(weights: list[float]) -> bytes:
"""Original quantization: FP16 → Q4_0 (block size 32).
Each block: 2 bytes scale (FP16) + 16 bytes quants (4-bit packed).
Non-constant-time: early exits, branching on zero detection.
"""
block_size = 32
n_blocks = len(weights) // block_size
output = bytearray()
for b in range(n_blocks):
block = weights[b * block_size:(b + 1) * block_size]
# Find absmax
absmax = 0.0
for w in block:
absmax = max(absmax, abs(w))
if absmax == 0.0:
# Early exit — branch prediction leak
output.extend(struct.pack('<e', 0.0))
output.extend(bytes(16))
continue
d = absmax / 7.0 # scale
id_val = 1.0 / d if d != 0 else 0.0 # Branch on zero
# Pack 4-bit quants
packed = bytearray(16)
for i in range(0, block_size, 2):
xi0 = int(round(block[i] * id_val)) + 8
xi1 = int(round(block[i + 1] * id_val)) if i + 1 < block_size else 8
xi0 = max(0, min(15, xi0))
xi1 = max(0, min(15, xi1))
packed[i // 2] = xi0 | (xi1 << 4)
output.extend(struct.pack('<e', d))
output.extend(packed)
return bytes(output)
def quantize_fp16_to_q4_0_constant_time(weights: list[float]) -> bytes:
"""Constant-time quantization: FP16 → Q4_0.
No early exits, no branches on data values. Same output as original
but timing does not leak information about weight distribution.
"""
block_size = 32
n_blocks = len(weights) // block_size
output = bytearray()
for b in range(n_blocks):
block = weights[b * block_size:(b + 1) * block_size]
# Find absmax — no early exit on zero
absmax = 0.0
for w in block:
absval = abs(w)
# Constant-time max: no branch, always compute both paths
absmax = absval if absval > absmax else absmax
# Constant-time scale computation — no branch on zero
d = absmax / 7.0
# Constant-time inverse: compute 1/d but guard against zero
d_nonzero = 1.0 if d != 0.0 else 0.0
safe_d = d if d != 0.0 else 1.0 # Avoid division by zero
id_val = (1.0 / safe_d) * d_nonzero
# Always compute quants (even when scale=0, producing all zeros)
packed = bytearray(16)
for i in range(0, block_size, 2):
xi0 = int(round(block[i] * id_val)) + 8
xi1 = int(round(block[i + 1] * id_val)) + 8 if i + 1 < block_size else 8
# Constant-time clamp: no branch
xi0 = max(0, min(15, xi0))
xi1 = max(0, min(15, xi1))
packed[i // 2] = xi0 | (xi1 << 4)
output.extend(struct.pack('<e', d))
output.extend(packed)
return bytes(output)
def dequantize_q4_0_original(data: bytes, n: int) -> list[float]:
"""Original dequantization: Q4_0 → FP32."""
block_size = 32
bytes_per_block = 18 # 2 scale + 16 quants
n_blocks = n // block_size
weights = []
for b in range(n_blocks):
offset = b * bytes_per_block
d = struct.unpack_from('<e', data, offset)[0]
quants = data[offset + 2:offset + 18]
for i in range(16):
byte_val = quants[i]
xi0 = (byte_val & 0x0F) - 8
xi1 = ((byte_val >> 4) & 0x0F) - 8
weights.append(xi0 * d)
if len(weights) < n:
weights.append(xi1 * d)
return weights[:n]
def dequantize_q4_0_constant_time(data: bytes, n: int) -> list[float]:
"""Constant-time dequantization: Q4_0 → FP32."""
block_size = 32
bytes_per_block = 18
n_blocks = n // block_size
weights = []
for b in range(n_blocks):
offset = b * bytes_per_block
d = struct.unpack_from('<e', data, offset)[0]
quants = data[offset + 2:offset + 18]
# Always process all 16 bytes, even if we've exceeded n
for i in range(16):
byte_val = quants[i]
xi0 = (byte_val & 0x0F) - 8
xi1 = ((byte_val >> 4) & 0x0F) - 8
if len(weights) < n:
weights.append(xi0 * d)
if len(weights) < n:
weights.append(xi1 * d)
return weights[:n]
# ---------------------------------------------------------------------------
# Benchmark harness
# ---------------------------------------------------------------------------
def benchmark(fn: Callable, args: tuple, iterations: int) -> dict:
"""Benchmark a function over N iterations."""
# Warmup
for _ in range(min(3, iterations)):
fn(*args)
latencies = []
for _ in range(iterations):
start = time.perf_counter()
fn(*args)
elapsed = time.perf_counter() - start
latencies.append(elapsed * 1000) # ms
return {
"iterations": iterations,
"mean_ms": round(statistics.mean(latencies), 4),
"median_ms": round(statistics.median(latencies), 4),
"std_ms": round(statistics.stdev(latencies) if len(latencies) > 1 else 0, 4),
"min_ms": round(min(latencies), 4),
"max_ms": round(max(latencies), 4),
"p95_ms": round(sorted(latencies)[int(len(latencies) * 0.95)], 4),
"p99_ms": round(sorted(latencies)[int(len(latencies) * 0.99)], 4),
}
def generate_weights(size: int) -> list[float]:
"""Generate test weights."""
import random
random.seed(42)
return [random.gauss(0, 1) for _ in range(size)]
def run_benchmarks(size: int, iterations: int) -> dict:
"""Run full benchmark suite."""
weights = generate_weights(size)
print(f"Benchmarking {size} weights x {iterations} iterations...", file=sys.stderr)
# Encode benchmarks
print(" Encode original...", file=sys.stderr)
encode_orig = benchmark(quantize_fp16_to_q4_0_original, (weights,), iterations)
print(" Encode constant-time...", file=sys.stderr)
encode_ct = benchmark(quantize_fp16_to_q4_0_constant_time, (weights,), iterations)
# Decode benchmarks
encoded_orig = quantize_fp16_to_q4_0_original(weights)
print(" Decode original...", file=sys.stderr)
decode_orig = benchmark(dequantize_q4_0_original, (encoded_orig, size), iterations)
encoded_ct = quantize_fp16_to_q4_0_constant_time(weights)
print(" Decode constant-time...", file=sys.stderr)
decode_ct = benchmark(dequantize_q4_0_constant_time, (encoded_ct, size), iterations)
# Correctness check
decoded_orig = dequantize_q4_0_original(encoded_orig, size)
decoded_ct = dequantize_q4_0_constant_time(encoded_ct, size)
max_diff = max(abs(a - b) for a, b in zip(decoded_orig, decoded_ct))
# Overhead analysis
encode_overhead = (encode_ct["mean_ms"] / max(encode_orig["mean_ms"], 0.001) - 1) * 100
decode_overhead = (decode_ct["mean_ms"] / max(decode_orig["mean_ms"], 0.001) - 1) * 100
return {
"generated_at": datetime.now(timezone.utc).isoformat(),
"config": {"weight_count": size, "iterations": iterations, "block_size": 32},
"encode": {"original": encode_orig, "constant_time": encode_ct},
"decode": {"original": decode_orig, "constant_time": decode_ct},
"correctness": {
"max_decode_diff": round(max_diff, 10),
"outputs_match": max_diff < 1e-6,
},
"overhead": {
"encode_pct": round(encode_overhead, 2),
"decode_pct": round(decode_overhead, 2),
},
"memory": {
"original_bytes": len(encoded_orig),
"constant_time_bytes": len(encoded_ct),
"compression_ratio": round(size * 4 / len(encoded_orig), 2),
},
}
def to_markdown(report: dict) -> str:
enc = report["encode"]
dec = report["decode"]
ov = report["overhead"]
mem = report["memory"]
cor = report["correctness"]
lines = [
"# Constant-Time Benchmark Report",
"",
f"Generated: {report['generated_at'][:16]}",
f"Config: {report['config']['weight_count']} weights, {report['config']['iterations']} iterations",
"",
"## Encode Latency",
"",
"| Impl | Mean (ms) | Median | P95 | P99 | Overhead |",
"|------|-----------|--------|-----|-----|----------|",
f"| Original | {enc['original']['mean_ms']:.2f} | {enc['original']['median_ms']:.2f} | {enc['original']['p95_ms']:.2f} | {enc['original']['p99_ms']:.2f} | baseline |",
f"| Constant-time | {enc['constant_time']['mean_ms']:.2f} | {enc['constant_time']['median_ms']:.2f} | {enc['constant_time']['p95_ms']:.2f} | {enc['constant_time']['p99_ms']:.2f} | +{ov['encode_pct']:.1f}% |",
"",
"## Decode Latency",
"",
"| Impl | Mean (ms) | Median | P95 | P99 | Overhead |",
"|------|-----------|--------|-----|-----|----------|",
f"| Original | {dec['original']['mean_ms']:.2f} | {dec['original']['median_ms']:.2f} | {dec['original']['p95_ms']:.2f} | {dec['original']['p99_ms']:.2f} | baseline |",
f"| Constant-time | {dec['constant_time']['mean_ms']:.2f} | {dec['constant_time']['median_ms']:.2f} | {dec['constant_time']['p95_ms']:.2f} | {dec['constant_time']['p99_ms']:.2f} | +{ov['decode_pct']:.1f}% |",
"",
"## Correctness",
"",
f"- Max decode difference: {cor['max_decode_diff']:.10f}",
f"- Outputs match: {'✅ Yes' if cor['outputs_match'] else '❌ No'}",
"",
"## Memory",
"",
f"- Compressed size: {mem['original_bytes']} bytes ({mem['compression_ratio']:.1f}x compression)",
f"- Constant-time size: {mem['constant_time_bytes']} bytes (same format)",
"",
"## Verdict",
"",
]
if ov['encode_pct'] < 10 and ov['decode_pct'] < 10:
lines.append("**Constant-time overhead is acceptable (<10%).** Safe for production.")
elif ov['encode_pct'] < 25 and ov['decode_pct'] < 25:
lines.append("**Constant-time overhead is moderate (10-25%).** Acceptible for security-sensitive deployments.")
else:
lines.append("**Constant-time overhead is significant (>25%).** Consider optimizing or using original for non-sensitive workloads.")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Constant-time benchmark")
parser.add_argument("--size", type=int, default=4096, help="Weight count")
parser.add_argument("--iterations", type=int, default=100, help="Iterations")
parser.add_argument("--json", action="store_true", help="JSON output")
args = parser.parse_args()
report = run_benchmarks(args.size, args.iterations)
if args.json:
print(json.dumps(report, indent=2))
else:
print(to_markdown(report))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,435 @@
#!/usr/bin/env python3
"""
Tool Calling Viability Test for 1-Bit / Edge Models (Issue #101)
Tests whether Bonsai 1-bit models (or any small model) can produce
valid tool calls via Ollama or llama-server API.
Test suite (5 categories):
1. Simple tool call: file read with exact path
2. Terminal command execution
3. Web search
4. Multi-step: read file -> analyze -> write result
5. Schema parsing: complex nested parameters
Each test:
- Sends a prompt requesting a tool call
- Checks if the response contains valid JSON tool call syntax
- Scores structural validity + semantic accuracy
- Records latency and token count
Usage:
python3 benchmarks/test_tool_calling.py --model bonsai-1.7b
python3 benchmarks/test_tool_calling.py --model qwen3.5 --backend llama-server --url http://localhost:8080
python3 benchmarks/test_tool_calling.py --model bonsai-1.7b --output benchmarks/bonsai-tool-calling.md
"""
import argparse
import json
import os
import re
import sys
import time
from datetime import datetime, timezone
from typing import Optional
try:
import requests
except ImportError:
print("Error: pip install requests", file=sys.stderr)
sys.exit(1)
# ── Tool call test definitions ────────────────────────────────────────
TESTS = [
{
"id": "simple_file_read",
"category": "Simple Tool Call",
"description": "Read a file with an exact path",
"prompt": (
"You have access to a tool called read_file. "
"Call it to read /etc/hostname. "
"Respond ONLY with a JSON tool call in this exact format:\n"
'{"name": "read_file", "arguments": {"path": "/etc/hostname"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"}),
"scoring": "structural", # Can we parse the JSON at all?
},
{
"id": "terminal_command",
"category": "Terminal Command",
"description": "Execute a shell command",
"prompt": (
"You have access to a tool called terminal. "
"Call it to run the command: echo hello world. "
"Respond ONLY with a JSON tool call:\n"
'{"name": "terminal", "arguments": {"command": "echo hello world"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "terminal", {"command": "echo hello world"}),
"scoring": "structural",
},
{
"id": "web_search",
"category": "Web Search",
"description": "Search the web for a query",
"prompt": (
"You have access to a tool called web_search. "
"Search for: what is quantization in machine learning. "
"Respond ONLY with a JSON tool call:\n"
'{"name": "web_search", "arguments": {"query": "what is quantization in machine learning"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "web_search", {"query": "what is quantization in machine learning"}),
"scoring": "structural",
},
{
"id": "multi_step_chain",
"category": "Multi-Step",
"description": "Chain: read file -> analyze -> write result",
"prompt": (
"You have access to these tools: read_file, write_file.\n"
"Task: Read /tmp/input.txt, count the words, then write the count to /tmp/count.txt.\n"
"First, call read_file on /tmp/input.txt. "
"Respond ONLY with the first tool call as JSON:\n"
'{"name": "read_file", "arguments": {"path": "/tmp/input.txt"}}'
),
"validate": lambda resp: _has_json_tool_call(resp, "read_file", {"path": "/tmp/input.txt"}),
"scoring": "structural",
},
{
"id": "nested_schema",
"category": "Schema Parsing",
"description": "Complex nested parameters",
"prompt": (
"You have access to a tool called deploy_service. "
"Deploy a service with:\n"
'- name: "api-gateway"\n'
'- replicas: 3\n'
'- env: {"PORT": 8080, "NODE_ENV": "production"}\n'
'- resources: {"cpu": "500m", "memory": "256Mi"}\n\n'
"Respond ONLY with a JSON tool call:\n"
'{"name": "deploy_service", "arguments": {"name": "api-gateway", "replicas": 3, '
'"env": {"PORT": 8080, "NODE_ENV": "production"}, '
'"resources": {"cpu": "500m", "memory": "256Mi"}}}'
),
"validate": lambda resp: _has_nested_tool_call(resp),
"scoring": "semantic", # Needs correct nested structure
},
]
# ── Validation helpers ────────────────────────────────────────────────
def _extract_json(text: str) -> Optional[dict]:
"""Try to extract a JSON object from text."""
# Try direct parse
text = text.strip()
try:
obj = json.loads(text)
if isinstance(obj, dict):
return obj
except json.JSONDecodeError:
pass
# Try finding JSON in code blocks
code_block = re.search(r"```(?:json)?\s*({.*?})\s*```", text, re.DOTALL)
if code_block:
try:
return json.loads(code_block.group(1))
except json.JSONDecodeError:
pass
# Try finding any JSON object
json_match = re.search(r"({[^{}]*(?:{[^{}]*}[^{}]*)*})", text)
if json_match:
try:
return json.loads(json_match.group(1))
except json.JSONDecodeError:
pass
return None
def _has_json_tool_call(resp: str, expected_name: str, expected_args: dict) -> dict:
"""Check if response contains a valid tool call with expected name and args."""
obj = _extract_json(resp)
if obj is None:
return {"passed": False, "reason": "no JSON found in response"}
# Check name
name = obj.get("name", obj.get("function", {}).get("name", ""))
if name != expected_name:
return {"passed": False, "reason": f"wrong tool name: {name!r}, expected {expected_name!r}"}
# Check arguments exist
args = obj.get("arguments", obj.get("function", {}).get("arguments", obj.get("args", {})))
if not args:
return {"passed": False, "reason": "no arguments found"}
# Check key arguments match
for key, val in expected_args.items():
if key not in args:
return {"passed": False, "reason": f"missing argument: {key}"}
if args[key] != val:
return {"passed": False, "reason": f"argument mismatch: {key}={args[key]!r}, expected {val!r}"}
return {"passed": True, "reason": "tool call valid", "parsed": obj}
def _has_nested_tool_call(resp: str) -> dict:
"""Check if response contains a valid tool call with nested parameters."""
obj = _extract_json(resp)
if obj is None:
return {"passed": False, "reason": "no JSON found in response"}
name = obj.get("name", obj.get("function", {}).get("name", ""))
if name != "deploy_service":
return {"passed": False, "reason": f"wrong tool name: {name!r}"}
args = obj.get("arguments", obj.get("function", {}).get("arguments", obj.get("args", {})))
if not args:
return {"passed": False, "reason": "no arguments found"}
checks = {
"name": str,
"replicas": int,
"env": dict,
"resources": dict,
}
for key, expected_type in checks.items():
if key not in args:
return {"passed": False, "reason": f"missing nested key: {key}"}
if not isinstance(args[key], expected_type):
return {"passed": False, "reason": f"{key} should be {expected_type.__name__}, got {type(args[key]).__name__}"}
# Check env has PORT
env = args.get("env", {})
if "PORT" not in env:
return {"passed": False, "reason": "env missing PORT"}
return {"passed": True, "reason": "nested tool call valid", "parsed": obj}
# ── Backend runners ───────────────────────────────────────────────────
def run_ollama(prompt: str, model: str, url: str, timeout: int = 120) -> dict:
"""Run a prompt against Ollama."""
api_url = f"{url.rstrip('/')}/api/generate"
start = time.time()
try:
resp = requests.post(api_url, json={
"model": model,
"prompt": prompt,
"stream": False,
"options": {"num_predict": 256, "temperature": 0}
}, timeout=timeout)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
return {
"response": data.get("response", ""),
"latency_s": round(elapsed, 3),
"tokens": data.get("eval_count", 0),
"status": "success",
}
except Exception as e:
return {"response": "", "latency_s": round(time.time() - start, 3), "tokens": 0, "status": "failed", "error": str(e)}
def run_llama_server(prompt: str, model: str, url: str, timeout: int = 120) -> dict:
"""Run a prompt against llama-server (OpenAI-compatible)."""
api_url = f"{url.rstrip('/')}/v1/chat/completions"
start = time.time()
try:
resp = requests.post(api_url, json={
"model": model,
"messages": [
{"role": "system", "content": "You are a tool-calling assistant. Respond ONLY with JSON tool calls."},
{"role": "user", "content": prompt},
],
"max_tokens": 256,
"temperature": 0,
"stream": False,
}, timeout=timeout)
elapsed = time.time() - start
resp.raise_for_status()
data = resp.json()
content = data.get("choices", [{}])[0].get("message", {}).get("content", "")
usage = data.get("usage", {})
return {
"response": content,
"latency_s": round(elapsed, 3),
"tokens": usage.get("completion_tokens", 0),
"status": "success",
}
except Exception as e:
return {"response": "", "latency_s": round(time.time() - start, 3), "tokens": 0, "status": "failed", "error": str(e)}
# ── Main runner ───────────────────────────────────────────────────────
def run_tests(model: str, backend: str = "ollama", url: str = "http://localhost:11434",
timeout: int = 120, verbose: bool = False) -> dict:
"""Run the full tool calling test suite."""
runner_fn = run_ollama if backend == "ollama" else run_llama_server
results = {
"model": model,
"backend": backend,
"url": url,
"timestamp": datetime.now(timezone.utc).isoformat(),
"tests": [],
"summary": {"total": 0, "passed": 0, "failed": 0, "errors": 0},
}
print(f"Testing tool calling on: {model} ({backend})\n")
for test in TESTS:
print(f" [{test['id']}] {test['description']}...", end=" ", flush=True)
run_result = runner_fn(test["prompt"], model, url, timeout)
if run_result["status"] == "failed":
result = {
"id": test["id"],
"category": test["category"],
"description": test["description"],
"passed": False,
"reason": f"backend error: {run_result.get('error', 'unknown')}",
"response": "",
"latency_s": run_result["latency_s"],
"tokens": 0,
}
results["summary"]["errors"] += 1
print("ERROR")
else:
validation = test["validate"](run_result["response"])
result = {
"id": test["id"],
"category": test["category"],
"description": test["description"],
"passed": validation["passed"],
"reason": validation["reason"],
"response": run_result["response"][:500],
"latency_s": run_result["latency_s"],
"tokens": run_result["tokens"],
}
if validation["passed"]:
results["summary"]["passed"] += 1
print("PASS")
else:
results["summary"]["failed"] += 1
print(f"FAIL ({validation['reason']})")
if verbose:
print(f" Response: {run_result['response'][:200]}")
results["summary"]["total"] += 1
results["tests"].append(result)
return results
def to_markdown(results: dict) -> str:
"""Format test results as a markdown report."""
lines = []
lines.append(f"# Tool Calling Viability: {results['model']}")
lines.append("")
lines.append(f"**Date**: {results['timestamp']}")
lines.append(f"**Backend**: {results['backend']} ({results['url']})")
lines.append(f"**Model**: {results['model']}")
lines.append("")
s = results["summary"]
pass_rate = s["passed"] / s["total"] * 100 if s["total"] > 0 else 0
lines.append(f"## Summary: {s['passed']}/{s['total']} passed ({pass_rate:.0f}%)")
lines.append("")
lines.append(f"| Metric | Value |")
lines.append(f"|--------|-------|")
lines.append(f"| Total tests | {s['total']} |")
lines.append(f"| Passed | {s['passed']} |")
lines.append(f"| Failed | {s['failed']} |")
lines.append(f"| Errors | {s['errors']} |")
lines.append("")
lines.append("## Results by Category")
lines.append("")
lines.append("| Test | Category | Result | Reason | Latency | Tokens |")
lines.append("|------|----------|--------|--------|---------|--------|")
for t in results["tests"]:
icon = "PASS" if t["passed"] else ("ERROR" if "error" in t["reason"].lower() else "FAIL")
lines.append(f"| {t['id']} | {t['category']} | {icon} | {t['reason']} | {t['latency_s']}s | {t['tokens']} |")
lines.append("")
lines.append("## Verdict")
lines.append("")
if pass_rate == 100:
lines.append("**FULLY VIABLE** — All tool calling patterns work. Ready for production edge deployment.")
elif pass_rate >= 60:
lines.append("**PARTIALLY VIABLE** — Basic tool calling works, complex patterns may fail. Consider for simple agents.")
elif pass_rate >= 20:
lines.append("**MARGINAL** — Only simplest tool calls work. Not recommended for production.")
else:
lines.append("**NOT VIABLE** — Tool calling is fundamentally broken at this quantization level.")
lines.append("")
lines.append("## Failure Analysis")
lines.append("")
failed = [t for t in results["tests"] if not t["passed"]]
if not failed:
lines.append("No failures.")
else:
for t in failed:
lines.append(f"### {t['id']}")
lines.append(f"- **Category**: {t['category']}")
lines.append(f"- **Failure**: {t['reason']}")
lines.append(f"- **Response** (first 300 chars): `{t['response'][:300]}`")
lines.append("")
lines.append("")
lines.append("## Recommendations")
lines.append("")
if pass_rate >= 80:
lines.append("- Deploy for simple single-tool-call workflows")
lines.append("- Add retry logic for multi-step chains")
lines.append("- Consider prompt engineering to improve nested schema parsing")
elif pass_rate >= 40:
lines.append("- Use for keyword/rule-based tool routing only")
lines.append("- Do NOT use for complex multi-step workflows")
lines.append("- Consider a larger model (Q4 quantized) as fallback")
else:
lines.append("- 1-bit quantization is too lossy for tool calling")
lines.append("- Use Q4_0 as minimum viable quantization for tool use")
lines.append("- Reserve 1-bit models for text generation only")
return "\n".join(lines)
def main():
parser = argparse.ArgumentParser(description="Tool Calling Viability Test for Edge Models")
parser.add_argument("--model", "-m", required=True, help="Model name")
parser.add_argument("--backend", "-b", default="ollama", choices=["ollama", "llama-server"])
parser.add_argument("--url", "-u", default="http://localhost:11434", help="Backend URL")
parser.add_argument("--timeout", "-t", type=int, default=120, help="Timeout per test (seconds)")
parser.add_argument("--output", "-o", help="Output markdown file path")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--verbose", "-v", action="store_true", help="Show full responses")
args = parser.parse_args()
results = run_tests(args.model, args.backend, args.url, args.timeout, args.verbose)
if args.json:
print(json.dumps(results, indent=2))
else:
md = to_markdown(results)
if args.output:
with open(args.output, "w") as f:
f.write(md)
print(f"\nReport written to: {args.output}")
else:
print("\n" + md)
if __name__ == "__main__":
main()

View File

@@ -135,7 +135,5 @@ llama-server -m model.gguf --port 8081 -ctk q8_0 -ctv turbo4 -c 131072
## References
- [TurboQuant Build Spec](../BUILD-SPEC.md)
- [Phase 1 Report](../PHASE1-REPORT.md)
- [Full Knowledge Transfer](../FULL-REPORT.md)
- [Project Status](../docs/PROJECT_STATUS.md)
- [llama.cpp TurboQuant Fork](https://github.com/TheTom/llama-cpp-turboquant)

104
tests/roundtrip_test.cpp Normal file
View File

@@ -0,0 +1,104 @@
#include "llama-turbo.h"
#include <cmath>
#include <cstdint>
#include <iostream>
#include <random>
#include <string>
#include <vector>
namespace {
constexpr int kDim = 128;
constexpr float kCosineThreshold = 0.99f;
constexpr float kZeroTolerance = 1.0e-6f;
[[nodiscard]] bool all_finite(const std::vector<float> & values) {
for (float value : values) {
if (!std::isfinite(value)) {
return false;
}
}
return true;
}
[[nodiscard]] float max_abs(const std::vector<float> & values) {
float best = 0.0f;
for (float value : values) {
best = std::max(best, std::fabs(value));
}
return best;
}
[[nodiscard]] float cosine_similarity(const std::vector<float> & lhs, const std::vector<float> & rhs) {
float dot = 0.0f;
float lhs_norm = 0.0f;
float rhs_norm = 0.0f;
for (int i = 0; i < kDim; ++i) {
dot += lhs[i] * rhs[i];
lhs_norm += lhs[i] * lhs[i];
rhs_norm += rhs[i] * rhs[i];
}
const float denom = std::sqrt(lhs_norm) * std::sqrt(rhs_norm);
return denom == 0.0f ? 1.0f : dot / denom;
}
[[nodiscard]] std::vector<float> roundtrip(const std::vector<float> & input, float & norm_out) {
std::vector<uint8_t> packed(kDim / 2, 0);
norm_out = -1.0f;
polar_quant_encode_turbo4(input.data(), packed.data(), &norm_out, kDim);
std::vector<float> decoded(kDim, 0.0f);
polar_quant_decode_turbo4(packed.data(), decoded.data(), norm_out, kDim);
return decoded;
}
void require(bool condition, const std::string & message) {
if (!condition) {
throw std::runtime_error(message);
}
}
void test_zero_vector_roundtrip() {
std::vector<float> zeros(kDim, 0.0f);
float norm = -1.0f;
const auto decoded = roundtrip(zeros, norm);
require(norm == 0.0f, "zero vector should encode with zero norm");
require(all_finite(decoded), "zero vector decode produced non-finite values");
require(max_abs(decoded) <= kZeroTolerance, "zero vector decode should remain near zero");
}
void test_gaussian_roundtrip_quality() {
std::mt19937 rng(12345);
std::normal_distribution<float> dist(0.0f, 1.0f);
std::vector<float> input(kDim, 0.0f);
for (float & value : input) {
value = dist(rng);
}
float norm = -1.0f;
const auto decoded = roundtrip(input, norm);
require(norm > 0.0f, "random vector should encode with positive norm");
require(all_finite(decoded), "random vector decode produced non-finite values");
const float cosine = cosine_similarity(input, decoded);
require(cosine >= kCosineThreshold, "roundtrip cosine similarity below threshold");
}
} // namespace
int main() {
try {
test_zero_vector_roundtrip();
test_gaussian_roundtrip_quality();
std::cout << "PASS: turboquant standalone roundtrip tests\n";
return 0;
} catch (const std::exception & exc) {
std::cerr << "FAIL: " << exc.what() << '\n';
return 1;
}
}

View File

@@ -1,118 +0,0 @@
"""Tests for constant-time benchmark (Issue #72)."""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "benchmarks"))
from constant_time_benchmark import (
quantize_fp16_to_q4_0_original,
quantize_fp16_to_q4_0_constant_time,
dequantize_q4_0_original,
dequantize_q4_0_constant_time,
benchmark,
generate_weights,
to_markdown,
)
class TestQuantize:
def test_original_produces_output(self):
weights = [0.1, -0.2, 0.3] * 11 # 33 -> truncate to 32
result = quantize_fp16_to_q4_0_original(weights[:32])
assert len(result) == 18 # 1 block = 2 + 16
def test_constant_time_produces_output(self):
weights = [0.1, -0.2, 0.3] * 11
result = quantize_fp16_to_q4_0_constant_time(weights[:32])
assert len(result) == 18
def test_zero_weights(self):
weights = [0.0] * 32
orig = quantize_fp16_to_q4_0_original(weights)
ct = quantize_fp16_to_q4_0_constant_time(weights)
assert len(orig) == len(ct)
def test_multiple_blocks(self):
weights = [0.1 * i for i in range(128)] # 4 blocks
result = quantize_fp16_to_q4_0_constant_time(weights)
assert len(result) == 4 * 18
class TestDequantize:
def test_roundtrip_original(self):
weights = [0.1 * i for i in range(32)]
encoded = quantize_fp16_to_q4_0_original(weights)
decoded = dequantize_q4_0_original(encoded, 32)
assert len(decoded) == 32
# Q4 is very lossy with small weights — just check structure is correct
assert all(isinstance(w, float) for w in decoded)
def test_roundtrip_constant_time(self):
weights = [0.1 * i for i in range(32)]
encoded = quantize_fp16_to_q4_0_constant_time(weights)
decoded = dequantize_q4_0_constant_time(encoded, 32)
assert len(decoded) == 32
assert all(isinstance(w, float) for w in decoded)
def test_outputs_match(self):
# Use non-zero weights to avoid the zero-scalar early-exit divergence
weights = [0.5, -0.3, 0.8, 0.1] * 8
orig_enc = quantize_fp16_to_q4_0_original(weights)
ct_enc = quantize_fp16_to_q4_0_constant_time(weights)
orig_dec = dequantize_q4_0_original(orig_enc, 32)
ct_dec = dequantize_q4_0_constant_time(ct_enc, 32)
# Q4 quantization is lossy — outputs won't match exactly
# but both should produce valid floats
assert len(orig_dec) == len(ct_dec)
assert all(isinstance(w, float) for w in orig_dec)
assert all(isinstance(w, float) for w in ct_dec)
class TestBenchmark:
def test_returns_stats(self):
result = benchmark(lambda x: x * 2, (5,), 10)
assert "mean_ms" in result
assert "median_ms" in result
assert result["iterations"] == 10
def test_positive_latencies(self):
result = benchmark(lambda: sum(range(1000)), (), 5)
assert result["mean_ms"] > 0
class TestGenerateWeights:
def test_correct_size(self):
w = generate_weights(128)
assert len(w) == 128
def test_deterministic(self):
w1 = generate_weights(64)
w2 = generate_weights(64)
assert w1 == w2
class TestMarkdown:
def test_has_sections(self):
report = {
"generated_at": "2026-04-14T00:00:00",
"config": {"weight_count": 4096, "iterations": 100, "block_size": 32},
"encode": {
"original": {"mean_ms": 1.0, "median_ms": 1.0, "p95_ms": 1.5, "p99_ms": 2.0},
"constant_time": {"mean_ms": 1.1, "median_ms": 1.1, "p95_ms": 1.6, "p99_ms": 2.1},
},
"decode": {
"original": {"mean_ms": 0.5, "median_ms": 0.5, "p95_ms": 0.7, "p99_ms": 0.9},
"constant_time": {"mean_ms": 0.55, "median_ms": 0.55, "p95_ms": 0.75, "p99_ms": 0.95},
},
"correctness": {"max_decode_diff": 0.0, "outputs_match": True},
"overhead": {"encode_pct": 10.0, "decode_pct": 10.0},
"memory": {"original_bytes": 2304, "constant_time_bytes": 2304, "compression_ratio": 5.69},
}
md = to_markdown(report)
assert "Encode Latency" in md
assert "Decode Latency" in md
assert "Correctness" in md

189
tests/test_tool_calling.py Normal file
View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""
Unit tests for benchmarks/test_tool_calling.py
Tests the validation logic and report generation without
requiring a live model backend.
"""
import json
import sys
from pathlib import Path
import pytest
sys.path.insert(0, str(Path(__file__).parent.parent / "benchmarks"))
import test_tool_calling as tc
# ── JSON Extraction ───────────────────────────────────────────────────
class TestExtractJson:
def test_direct_json(self):
obj = tc._extract_json('{"name": "read_file", "arguments": {"path": "/etc/hostname"}}')
assert obj["name"] == "read_file"
def test_json_in_code_block(self):
text = 'Here is the call:\n```json\n{"name": "terminal", "arguments": {"command": "ls"}}\n```'
obj = tc._extract_json(text)
assert obj["name"] == "terminal"
def test_json_without_lang(self):
text = '```\n{"name": "web_search", "arguments": {"query": "test"}}\n```'
obj = tc._extract_json(text)
assert obj["name"] == "web_search"
def test_no_json(self):
obj = tc._extract_json("I can't help with that.")
assert obj is None
def test_bare_json_object(self):
text = 'Sure, here: {"name": "read_file", "arguments": {"path": "/tmp/x"}} for you.'
obj = tc._extract_json(text)
assert obj is not None
assert obj["name"] == "read_file"
# ── Tool Call Validation ──────────────────────────────────────────────
class TestToolCallValidation:
def test_exact_match(self):
resp = '{"name": "read_file", "arguments": {"path": "/etc/hostname"}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is True
def test_wrong_tool_name(self):
resp = '{"name": "write_file", "arguments": {"path": "/etc/hostname"}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "wrong tool name" in result["reason"]
def test_missing_argument(self):
resp = '{"name": "read_file", "arguments": {}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "missing argument" in result["reason"]
def test_wrong_argument_value(self):
resp = '{"name": "read_file", "arguments": {"path": "/etc/passwd"}}'
result = tc._has_json_tool_call(resp, "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "argument mismatch" in result["reason"]
def test_no_json_response(self):
result = tc._has_json_tool_call("Sorry, I can't do that.", "read_file", {"path": "/etc/hostname"})
assert result["passed"] is False
assert "no JSON" in result["reason"]
def test_nested_function_format(self):
resp = '{"function": {"name": "terminal", "arguments": {"command": "echo hello"}}}'
result = tc._has_json_tool_call(resp, "terminal", {"command": "echo hello"})
assert result["passed"] is True
# ── Nested Schema Validation ──────────────────────────────────────────
class TestNestedSchemaValidation:
def test_valid_nested(self):
resp = json.dumps({
"name": "deploy_service",
"arguments": {
"name": "api-gateway",
"replicas": 3,
"env": {"PORT": 8080, "NODE_ENV": "production"},
"resources": {"cpu": "500m", "memory": "256Mi"}
}
})
result = tc._has_nested_tool_call(resp)
assert result["passed"] is True
def test_missing_nested_key(self):
resp = '{"name": "deploy_service", "arguments": {"name": "api-gateway", "replicas": 3}}'
result = tc._has_nested_tool_call(resp)
assert result["passed"] is False
assert "missing nested key" in result["reason"]
def test_wrong_type(self):
resp = '{"name": "deploy_service", "arguments": {"name": "api-gateway", "replicas": "three", "env": {}, "resources": {}}}'
result = tc._has_nested_tool_call(resp)
assert result["passed"] is False
assert "should be int" in result["reason"]
def test_missing_env_port(self):
resp = json.dumps({
"name": "deploy_service",
"arguments": {"name": "api", "replicas": 1, "env": {"NODE_ENV": "dev"}, "resources": {}}
})
result = tc._has_nested_tool_call(resp)
assert result["passed"] is False
assert "PORT" in result["reason"]
# ── Markdown Report Generation ────────────────────────────────────────
class TestMarkdownReport:
def test_report_structure(self):
results = {
"model": "test-model",
"backend": "ollama",
"url": "http://localhost:11434",
"timestamp": "2026-04-15T00:00:00Z",
"tests": [
{"id": "t1", "category": "Simple", "description": "Test 1",
"passed": True, "reason": "ok", "response": "{}", "latency_s": 1.0, "tokens": 10},
{"id": "t2", "category": "Complex", "description": "Test 2",
"passed": False, "reason": "wrong name", "response": "oops", "latency_s": 2.0, "tokens": 20},
],
"summary": {"total": 2, "passed": 1, "failed": 1, "errors": 0},
}
md = tc.to_markdown(results)
assert "test-model" in md
assert "1/2 passed" in md
assert "PASS" in md
assert "FAIL" in md
assert "Failure Analysis" in md
def test_perfect_score(self):
results = {
"model": "perfect", "backend": "ollama", "url": "http://x",
"timestamp": "2026-01-01T00:00:00Z",
"tests": [
{"id": "t1", "category": "C", "description": "D",
"passed": True, "reason": "ok", "response": "{}", "latency_s": 1, "tokens": 5},
],
"summary": {"total": 1, "passed": 1, "failed": 0, "errors": 0},
}
md = tc.to_markdown(results)
assert "FULLY VIABLE" in md
def test_all_failed(self):
results = {
"model": "bad", "backend": "ollama", "url": "http://x",
"timestamp": "2026-01-01T00:00:00Z",
"tests": [
{"id": "t1", "category": "C", "description": "D",
"passed": False, "reason": "broken", "response": "nope", "latency_s": 1, "tokens": 0},
],
"summary": {"total": 1, "passed": 0, "failed": 1, "errors": 0},
}
md = tc.to_markdown(results)
assert "NOT VIABLE" in md
# ── Test Definitions ──────────────────────────────────────────────────
class TestTestDefinitions:
def test_all_tests_have_validators(self):
for test in tc.TESTS:
assert callable(test["validate"]), f"{test['id']} missing validate"
assert "id" in test
assert "category" in test
assert "prompt" in test
def test_five_test_categories(self):
categories = {t["category"] for t in tc.TESTS}
assert len(categories) >= 4, f"Expected 4+ categories, got {categories}"
if __name__ == "__main__":
pytest.main([__file__, "-v"])