Compare commits

..

9 Commits

Author SHA1 Message Date
Alexander Whitestone
dabb96d315 docs: record Qwen3.5-9B DFlash Metal timeout (refs #152, #154)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 19s
2026-04-21 22:25:25 -04:00
Alexander Whitestone
69cef8a90f bench: record Apple Silicon DFlash pilot result (refs #152)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 18s
2026-04-21 22:20:15 -04:00
Alexander Whitestone
636d294896 feat: add Apple Silicon DFlash benchmark planner (refs #152)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 18s
2026-04-21 22:00:22 -04:00
492c1cdcfd Merge PR #90
All checks were successful
Smoke Test / smoke (pull_request) Successful in 13s
Merged PR #90: feat: integration test — turboquant compressed model
2026-04-17 01:52:09 +00:00
6e583310a8 Merge PR #91
Merged PR #91: feat: auto-select quantization based on available VRAM
2026-04-17 01:52:06 +00:00
300918ee1e test: quant selector tests (#81)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-15 15:04:41 +00:00
f7ea01cb65 feat: auto-select quantization based on available VRAM (#81) 2026-04-15 15:03:04 +00:00
d2edbdadc2 test: add tool call integration tests (#82)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:53:47 +00:00
c009d8df77 test: add pytest conftest (#82) 2026-04-15 14:53:45 +00:00
17 changed files with 1585 additions and 1006 deletions

View File

@@ -6,7 +6,6 @@ option(TURBOQUANT_BUILD_TESTS "Build standalone TurboQuant validation tests" ON)
add_library(turboquant STATIC
llama-turbo.cpp
llama-turbo-qjl.cpp
)
target_include_directories(turboquant PUBLIC
@@ -34,15 +33,4 @@ if(TURBOQUANT_BUILD_TESTS)
NAME turboquant_roundtrip
COMMAND turboquant_roundtrip_test
)
add_executable(turboquant_qjl_accuracy_test
tests/qjl_accuracy_test.cpp
)
target_link_libraries(turboquant_qjl_accuracy_test PRIVATE turboquant)
target_compile_features(turboquant_qjl_accuracy_test PRIVATE cxx_std_17)
add_test(
NAME turboquant_qjl_accuracy
COMMAND turboquant_qjl_accuracy_test
)
endif()

View File

@@ -30,3 +30,4 @@ See [issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant/i
## Docs
- [Project Status](docs/PROJECT_STATUS.md) — Full project status and build specification
- [DFlash on Apple Silicon](docs/DFLASH_APPLE_SILICON.md) — MLX benchmark planner, setup commands, and report workflow

View File

@@ -0,0 +1,189 @@
#!/usr/bin/env python3
"""Apple Silicon DFlash planning helpers and CLI (issue #152)."""
from __future__ import annotations
import argparse
import json
import platform
import subprocess
from dataclasses import asdict, dataclass
from pathlib import Path
from typing import Iterable, Optional
@dataclass(frozen=True)
class DFlashPair:
slug: str
base_model: str
draft_model: str
estimated_total_weights_gb: float
minimum_recommended_memory_gb: float
draft_sliding_window_size: int = 4096
SUPPORTED_PAIRS: tuple[DFlashPair, ...] = (
DFlashPair(
slug="qwen35-4b",
base_model="Qwen/Qwen3.5-4B",
draft_model="z-lab/Qwen3.5-4B-DFlash",
estimated_total_weights_gb=9.68,
minimum_recommended_memory_gb=16.0,
),
DFlashPair(
slug="qwen35-9b",
base_model="Qwen/Qwen3.5-9B",
draft_model="z-lab/Qwen3.5-9B-DFlash",
estimated_total_weights_gb=19.93,
minimum_recommended_memory_gb=28.0,
),
)
def detect_total_memory_gb() -> float:
"""Detect total system memory in GiB, rounded to a whole number for planning."""
system = platform.system()
if system == "Darwin":
mem_bytes = int(subprocess.check_output(["sysctl", "-n", "hw.memsize"]).strip())
return round(mem_bytes / (1024 ** 3), 1)
if system == "Linux":
with open("/proc/meminfo", "r", encoding="utf-8") as handle:
for line in handle:
if line.startswith("MemTotal:"):
mem_kb = int(line.split()[1])
return round(mem_kb / (1024 ** 2), 1)
raise RuntimeError(f"Unsupported platform for memory detection: {system}")
def get_pair(slug: str) -> DFlashPair:
for pair in SUPPORTED_PAIRS:
if pair.slug == slug:
return pair
raise ValueError(f"Unknown DFlash pair: {slug}")
def select_pair(total_memory_gb: float, preferred_slug: Optional[str] = None) -> DFlashPair:
"""Pick the strongest upstream-supported pair likely to fit the machine."""
if preferred_slug:
return get_pair(preferred_slug)
fitting = [pair for pair in SUPPORTED_PAIRS if total_memory_gb >= pair.minimum_recommended_memory_gb]
if fitting:
return max(fitting, key=lambda pair: pair.minimum_recommended_memory_gb)
return SUPPORTED_PAIRS[0]
def build_mlx_benchmark_command(
pair: DFlashPair,
*,
dataset: str = "gsm8k",
max_samples: int = 128,
enable_thinking: bool = True,
) -> str:
"""Build the upstream MLX benchmark command from the DFlash README."""
parts = [
"python -m dflash.benchmark --backend mlx",
f"--model {pair.base_model}",
f"--draft-model {pair.draft_model}",
f"--dataset {dataset}",
f"--max-samples {max_samples}",
]
if enable_thinking:
parts.append("--enable-thinking")
parts.append(f"--draft-sliding-window-size {pair.draft_sliding_window_size}")
return " \\\n ".join(parts)
def build_setup_commands(pair: DFlashPair) -> list[str]:
return [
"python3 -m venv .venv-dflash",
"source .venv-dflash/bin/activate",
"git clone https://github.com/z-lab/dflash.git",
"cd dflash",
"pip install -e .[mlx]",
build_mlx_benchmark_command(pair),
]
def render_report_template(machine_label: str, pair: DFlashPair) -> str:
command = build_mlx_benchmark_command(pair)
return f"""# DFlash Apple Silicon Benchmark Report
## Machine
- Label: {machine_label}
- Selected pair: {pair.slug}
- Base model: {pair.base_model}
- Draft model: {pair.draft_model}
- Estimated total weight footprint: {pair.estimated_total_weights_gb:.2f} GB
## Setup
```bash
python3 -m venv .venv-dflash
source .venv-dflash/bin/activate
git clone https://github.com/z-lab/dflash.git
cd dflash
pip install -e .[mlx]
{command}
```
## Baseline comparison
Compare against **plain MLX or llama.cpp speculative decoding** on the same prompt set.
## Results
- Throughput (tok/s):
- Peak memory (GB):
- Notes on acceptance / behavior:
## Verdict
Worth operationalizing locally?
- [ ] Yes
- [ ] No
- [ ] Needs more data
## Recommendation
Explain whether this should become part of the local inference stack.
"""
def build_plan(total_memory_gb: float, preferred_slug: Optional[str] = None) -> dict:
pair = select_pair(total_memory_gb=total_memory_gb, preferred_slug=preferred_slug)
return {
"machine_memory_gb": total_memory_gb,
"selected_pair": asdict(pair),
"setup_commands": build_setup_commands(pair),
"benchmark_command": build_mlx_benchmark_command(pair),
"baseline_note": "Compare against plain MLX or llama.cpp speculative decoding on the same prompt set.",
}
def write_output(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(content, encoding="utf-8")
def main(argv: Optional[Iterable[str]] = None) -> int:
parser = argparse.ArgumentParser(description="Plan Apple Silicon DFlash benchmarks")
parser.add_argument("--memory-gb", type=float, default=None, help="Override detected total memory")
parser.add_argument("--pair", choices=[pair.slug for pair in SUPPORTED_PAIRS], default=None)
parser.add_argument("--machine-label", default="Apple Silicon Mac")
parser.add_argument("--format", choices=["json", "markdown"], default="markdown")
parser.add_argument("--output", default=None, help="Write plan/report to file instead of stdout")
args = parser.parse_args(list(argv) if argv is not None else None)
memory_gb = args.memory_gb if args.memory_gb is not None else detect_total_memory_gb()
pair = select_pair(total_memory_gb=memory_gb, preferred_slug=args.pair)
if args.format == "json":
content = json.dumps(build_plan(memory_gb, preferred_slug=pair.slug), indent=2)
else:
content = render_report_template(args.machine_label, pair)
if args.output:
write_output(Path(args.output), content)
else:
print(content)
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,41 @@
# DFlash Apple Silicon Benchmark Report
## Machine
- Label: M3 Max 36GB
- Selected pair: qwen35-9b
- Base model: Qwen/Qwen3.5-9B
- Draft model: z-lab/Qwen3.5-9B-DFlash
- Estimated total weight footprint: 19.93 GB
## Setup
```bash
python3 -m venv .venv-dflash
source .venv-dflash/bin/activate
git clone https://github.com/z-lab/dflash.git
cd dflash
pip install -e .[mlx]
python -m dflash.benchmark --backend mlx \
--model Qwen/Qwen3.5-9B \
--draft-model z-lab/Qwen3.5-9B-DFlash \
--dataset gsm8k \
--max-samples 128 \
--enable-thinking \
--draft-sliding-window-size 4096
```
## Baseline comparison
Compare against **plain MLX or llama.cpp speculative decoding** on the same prompt set.
## Results
- Throughput (tok/s):
- Peak memory (GB):
- Notes on acceptance / behavior:
## Verdict
Worth operationalizing locally?
- [ ] Yes
- [ ] No
- [ ] Needs more data
## Recommendation
Explain whether this should become part of the local inference stack.

View File

@@ -0,0 +1,46 @@
# DFlash Apple Silicon Pilot — Qwen3.5-4B on M3 Max 36GB
Date: 2026-04-21
Machine: Apple M3 Max, 36 GB unified memory
Repo issue: #152
## Command
```bash
source /tmp/dflash-venv/bin/activate
cd /tmp/dflash-upstream
python -m dflash.benchmark --backend mlx \
--model Qwen/Qwen3.5-4B \
--draft-model z-lab/Qwen3.5-4B-DFlash \
--dataset gsm8k \
--max-samples 1 \
--enable-thinking \
--draft-sliding-window-size 4096
```
## Result
- Dataset: `gsm8k`
- Samples: `1`
- Baseline throughput: `22.35 tok/s`
- DFlash throughput: `46.78 tok/s`
- Decoding speedup: `2.09x`
- Average acceptance length: `6.48`
Acceptance length histogram:
```text
['0.3%', '11.1%', '12.7%', '10.4%', '11.7%', '7.6%', '7.0%', '3.8%', '5.1%', '6.3%', '2.8%', '3.8%', '2.2%', '1.9%', '0.9%', '2.5%', '9.8%']
```
## Caveats
- This is a **pilot**, not a decision-grade benchmark.
- Only `1` sample was run, so the throughput number is directional.
- No apples-to-apples baseline against plain MLX or llama.cpp speculative decoding is included yet.
- The planner still recommends trying `Qwen/Qwen3.5-9B + z-lab/Qwen3.5-9B-DFlash` on this machine for the more meaningful fit test.
## Interim takeaway
DFlash is **real on Apple Silicon** and already shows a meaningful local speedup on a small matched pair.
A `2.09x` pilot speedup on `Qwen3.5-4B` is enough evidence to keep pushing toward a proper benchmark slice in this repo.

View File

@@ -0,0 +1,59 @@
# DFlash on Apple Silicon Failure Report — Qwen3.5-9B on M3 Max 36GB
Date: 2026-04-21
Machine: Apple M3 Max, 36 GB unified memory
Repo issue: #152
## Command
```bash
source /tmp/dflash-venv/bin/activate
cd /tmp/dflash-upstream
python -m dflash.benchmark --backend mlx \
--model Qwen/Qwen3.5-9B \
--draft-model z-lab/Qwen3.5-9B-DFlash \
--dataset gsm8k \
--max-samples 1 \
--enable-thinking \
--draft-sliding-window-size 4096
```
## Outcome
The benchmark did **not** complete successfully on this machine.
### Failure signature
```text
libc++abi: terminating due to uncaught exception of type std::runtime_error:
[METAL] Command buffer execution failed:
Caused GPU Timeout Error (00000002:kIOGPUCommandBufferCallbackErrorTimeout)
```
Additional shutdown noise:
```text
bash: [11285: 1] tcsetattr: Inappropriate ioctl for device
resource_tracker: There appear to be 1 leaked semaphore objects to clean up at shutdown
```
## Interpretation
This is strong evidence that the `Qwen/Qwen3.5-9B + z-lab/Qwen3.5-9B-DFlash` pair is **not currently stable** on an M3 Max 36GB Mac under the upstream MLX benchmark path, at least with the default settings used here.
It may still be salvageable with:
- smaller block size / different benchmark settings
- a shorter generation target
- a different prompt sample
- upstream MLX / Metal fixes
- newer Apple Silicon hardware
But as of this run, it should be treated as **experimental / failing** on this exact machine.
## Recommendation
For this Mac, the working local proof path is still:
- `Qwen/Qwen3.5-4B`
- `z-lab/Qwen3.5-4B-DFlash`
Use the 4B pair for reproducible local validation while the 9B Metal timeout is investigated separately.

View File

@@ -0,0 +1,125 @@
# DFlash on Apple Silicon
This repo now carries a **Gitea-first benchmark harness** for evaluating whether upstream **DFlash on MLX** is worth adding to the local Apple Silicon inference stack.
## Why
The headline `Kimi K2.6 + DFlash` benchmark was measured on `8x MI300X` with huge RAM and ROCm patches. That exact recipe is not a fit for a `36 GB` Apple Silicon Mac.
What *is* relevant locally is the upstream `z-lab/dflash` MLX path, which can benchmark smaller matched target/draft pairs that fit on Apple Silicon.
## Current repo entry point
Use:
```bash
python3 benchmarks/dflash_apple_silicon.py --machine-label "M3 Max 36GB"
```
This prints a benchmark report template with:
- the selected model/draft pair
- exact setup commands
- the upstream MLX benchmark command
- baseline comparison guidance
Write the template to a file:
```bash
python3 benchmarks/dflash_apple_silicon.py \
--machine-label "M3 Max 36GB" \
--output benchmarks/reports/dflash_m3max_36gb.md
```
Emit the underlying plan as JSON:
```bash
python3 benchmarks/dflash_apple_silicon.py --format json
```
## Selection logic
Today the planner uses two upstream-supported MLX pairs:
- `qwen35-9b`
- base: `Qwen/Qwen3.5-9B`
- draft: `z-lab/Qwen3.5-9B-DFlash`
- chosen for ~28 GB+ machines
- `qwen35-4b`
- base: `Qwen/Qwen3.5-4B`
- draft: `z-lab/Qwen3.5-4B-DFlash`
- fallback for tighter-memory Macs
On a `36 GB` Mac, the default recommendation is `qwen35-9b`.
## Pilot result already landed
A first live Apple Silicon run has already been captured in:
- `benchmarks/reports/dflash_m3max_36gb_qwen35_4b_pilot.md`
Pilot command:
```bash
python -m dflash.benchmark --backend mlx \
--model Qwen/Qwen3.5-4B \
--draft-model z-lab/Qwen3.5-4B-DFlash \
--dataset gsm8k \
--max-samples 1 \
--enable-thinking \
--draft-sliding-window-size 4096
```
Pilot outcome on this Mac:
- baseline throughput: `22.35 tok/s`
- DFlash throughput: `46.78 tok/s`
- decoding speedup: `2.09x`
Treat that as a **directional proof**, not a final decision benchmark. The next step is the fuller comparison slice against plain MLX or llama.cpp speculative decoding.
## Known 9B failure on this machine
A follow-up live run with:
- `Qwen/Qwen3.5-9B`
- `z-lab/Qwen3.5-9B-DFlash`
failed on this same M3 Max 36GB Mac with:
```text
[METAL] Command buffer execution failed:
Caused GPU Timeout Error (00000002:kIOGPUCommandBufferCallbackErrorTimeout)
```
That failure is recorded in:
- `benchmarks/reports/dflash_m3max_36gb_qwen35_9b_timeout.md`
So the current guidance is:
- treat `qwen35-9b` as **experimental** on this machine
- treat `qwen35-4b` as the current **known-working local proof path**
- keep the issue open until we either stabilize the 9B path or clearly rule it out for this hardware tier
## Upstream benchmark command
The harness uses the upstream MLX benchmark syntax from `z-lab/dflash`:
```bash
python -m dflash.benchmark --backend mlx \
--model Qwen/Qwen3.5-9B \
--draft-model z-lab/Qwen3.5-9B-DFlash \
--dataset gsm8k \
--max-samples 128 \
--enable-thinking \
--draft-sliding-window-size 4096
```
## What remains
This PR adds the **planner + report template** so the benchmark is reproducible from the repo.
The issue remains open until a real Apple Silicon run lands with:
- measured throughput
- measured memory
- a baseline comparison against plain MLX or llama.cpp speculative decoding
- a recommendation on whether to operationalize DFlash locally

View File

@@ -1,143 +0,0 @@
# QJL Residual Correction — Implementation Plan
**Issue:** #66
**Status:** Implementation + accuracy gates
**Blocking:** Full TurboQuant deployment (currently PolarQuant-only)
---
## What is QJL?
Quantized Johnson-Lindenstrauss (QJL) is the second stage of TurboQuant. It corrects the quantization error left by PolarQuant using 1-bit sign projections.
**Without QJL:** PolarQuant-only ≈ 4.2x compression, ~4-bit/channel
**With QJL:** Full TurboQuant ≈ 7.1x compression, ~3.5-bit/channel, zero accuracy loss
The key insight: the residual `x - PolarQuant(x)` is small but structured. QJL captures the *direction* of the residual using a random projection, then stores just the sign (1 bit per projection dimension).
---
## Algorithm
### Encode (per KV vector)
1. PolarQuant encode → 4-bit indices + radius (existing)
2. Decode PolarQuant back to get reconstruction
3. Compute residual: `r = x - reconstruction`
4. Project onto JL space: `p = R^T * r` (R is fixed random ±1 matrix, d × 64)
5. 1-bit quantize projections: `signs = sign(p)` → 64 bits = 8 bytes
### Decode (per KV vector)
1. PolarQuant decode → reconstructed vector (existing)
2. Unpack sign bits → ±1 array
3. Reconstruct correction: `correction = R * signs * scale`
4. Add correction: `output = reconstruction + correction`
### Storage
| Component | Bytes/vector (d=128) |
|-----------|---------------------|
| PolarQuant | 64 (4-bit indices) |
| QJL signs | 8 (1-bit × 64) |
| **Total** | **72 bytes** |
| FP32 | 512 bytes |
| FP16 | 256 bytes |
**Compression:** 7.1x vs FP32, 3.6x vs FP16
---
## Files Added
### Core Implementation
- `llama-turbo-qjl.h` — QJL API header
- `llama-turbo-qjl.cpp` — CPU reference implementation
### Metal Kernels
- `ggml-metal-qjl.metal` — GPU kernels for encode/decode
### Tests
- `tests/qjl_accuracy_test.cpp` — 8 accuracy gate tests
### Updated
- `CMakeLists.txt` — Added QJL library and test targets
---
## Accuracy Gates
Target: perplexity delta < 0.1% vs f16 (to be validated end-to-end with llama-perplexity).
Proxy gates (unit tests):
| Gate | Threshold | Rationale |
|------|-----------|-----------|
| Cosine similarity | ≥ 0.95 | Direction preservation for attention scores |
| Max absolute error | ≤ 0.8 | 1-bit quantization has bounded per-element error |
| Mean absolute error | ≤ 0.2 | Average reconstruction quality |
| Zero vector | Exact zero | Edge case correctness |
| Determinism | Exact match | Encode must be reproducible |
| Compression ratio | > 6x vs FP32 | Storage efficiency |
**Note on 1-bit accuracy:** 1-bit QJL stores only the sign of each projection, losing magnitude information. The scale factor (residual norm) is estimated from the original residual. This means:
- Direction is well-preserved (cosine > 0.95)
- Magnitude has bounded error (proportional to residual energy)
- Real quality benefit shows in perplexity (attention dot products), not per-vector MAE
- For tighter accuracy, consider 2-bit or 4-bit QJL variants (future work)
---
## Integration Points
### llama-turbo.cpp (CPU)
```cpp
// Existing PolarQuant path
polar_quant_encode_turbo4(src, dst_polar, &norm, d);
polar_quant_decode_turbo4(dst_polar, decoded, norm, d);
// Add QJL path (new)
turboquant_encode_qjl(src, dst_polar, &norm, dst_qjl, d);
turboquant_decode_qjl(dst_polar, norm, src_qjl, decoded, d);
```
### ggml-metal-turbo.metal (GPU)
```metal
// Add QJL kernels alongside existing turbo4 kernels
kernel void kernel_qjl_encode_residual(...);
kernel void kernel_qjl_decode_residual(...);
kernel void kernel_turboquant_qjl_dequant(...); // Fused attention path
```
### llama.cpp Integration
1. Add `GGML_TYPE_TURBOQUANT_QJL` to ggml_type enum
2. Allocate QJL sign storage alongside PolarQuant in KV cache
3. Use fused dequant kernel in attention hot path
---
## Trade-offs
| Factor | PolarQuant-only | TurboQuant (with QJL) |
|--------|----------------|----------------------|
| Compression | 4.2x (FP32) | 7.1x (FP32) |
| Bits/channel | ~4 | ~3.5 |
| Storage/vector | 64 bytes | 72 bytes |
| Encode overhead | Low | +30% (extra roundtrip + projection) |
| Decode overhead | Low | +15% (extra correction add) |
| Quality | Good | Excellent (zero accuracy loss) |
**Recommendation:** Enable QJL for production. The 12.5% storage overhead buys significant quality improvement, especially for long-context sessions where quantization errors accumulate.
---
## Next Steps
1. ✅ QJL CPU reference implementation
2. ✅ Metal kernel templates
3. ✅ Accuracy gate tests
4. ⬜ Build and run tests on M1
5. ⬜ Benchmark QJL vs PolarQuant-only perplexity
6. ⬜ Integrate into llama.cpp fork KV cache path
7. ⬜ End-to-end attention score accuracy test
---
*Implementation plan for Issue #66. Closes #66.*

548
evolution/quant_selector.py Normal file
View File

@@ -0,0 +1,548 @@
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
Detects hardware resources at startup and picks the highest quality
quantization level that fits within available memory. Supports Apple
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
Usage:
from evolution.quant_selector import select_quant_level
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
print(selection.level) # "turbo4"
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
"""
import logging
import os
import platform
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# ── Quant Level Definitions ───────────────────────────────────────────────────
@dataclass
class QuantLevel:
"""A TurboQuant compression level with its memory characteristics."""
name: str # e.g. "turbo4"
bits_per_channel: float # e.g. 3.5 for turbo4
compression_ratio: float # vs uncompressed KV cache
quality_label: str # "best", "high", "balanced", "fast"
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
kv_type: str # -ctk/-ctv flag value
min_memory_headroom_gb: float # Minimum free memory to recommend this level
description: str = ""
# Ordered from highest quality to most aggressive compression
QUANT_LEVELS = [
QuantLevel(
name="turbo4",
bits_per_channel=3.5,
compression_ratio=4.2,
quality_label="best",
layer_adaptive=7,
kv_type="turbo4",
min_memory_headroom_gb=4.0,
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
),
QuantLevel(
name="turbo3",
bits_per_channel=2.5,
compression_ratio=6.0,
quality_label="high",
layer_adaptive=5,
kv_type="turbo3",
min_memory_headroom_gb=3.0,
description="3-bit TurboQuant. High quality, ~6x KV compression."
),
QuantLevel(
name="turbo2",
bits_per_channel=1.5,
compression_ratio=10.0,
quality_label="balanced",
layer_adaptive=3,
kv_type="turbo2",
min_memory_headroom_gb=2.0,
description="2-bit TurboQuant. Balanced, ~10x KV compression."
),
QuantLevel(
name="q4_0",
bits_per_channel=4.0,
compression_ratio=3.5,
quality_label="fast",
layer_adaptive=0,
kv_type="q4_0",
min_memory_headroom_gb=1.5,
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
),
]
# ── Hardware Detection ────────────────────────────────────────────────────────
@dataclass
class HardwareInfo:
"""Detected hardware resources."""
total_memory_gb: float
available_memory_gb: float
gpu_memory_gb: Optional[float] = None
gpu_name: Optional[str] = None
is_apple_silicon: bool = False
chip_name: Optional[str] = None
cpu_cores: int = 0
detection_method: str = ""
def detect_hardware() -> HardwareInfo:
"""Detect available memory and GPU resources."""
system = platform.system()
if system == "Darwin":
return _detect_apple_silicon()
elif system == "Linux":
return _detect_linux()
else:
return _detect_generic(system)
def _detect_apple_silicon() -> HardwareInfo:
"""Detect Apple Silicon unified memory."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
is_apple_silicon=True,
detection_method="sysctl",
)
try:
# Get total memory
result = subprocess.run(
["sysctl", "-n", "hw.memsize"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
# Get chip name
result = subprocess.run(
["sysctl", "-n", "machdep.cpu.brand_string"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.chip_name = result.stdout.strip()
# Try to get GPU name (Apple Silicon)
result = subprocess.run(
["system_profiler", "SPDisplaysDataType"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "Chipset" in line or "GPU" in line:
info.gpu_name = line.split(":")[-1].strip()
break
# Estimate available memory (vm_stat)
result = subprocess.run(
["vm_stat"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
page_size = 4096 # macOS default
free_pages = 0
for line in result.stdout.split("\n"):
if "Pages free:" in line:
try:
free_pages = int(line.split(":")[-1].strip().rstrip("."))
except ValueError:
pass
# Available ≈ free + some speculative (conservative: just free)
info.available_memory_gb = (free_pages * page_size) / (1024**3)
# Fallback if vm_stat parsing failed
if info.available_memory_gb < 1:
# Conservative: 70% of total
info.available_memory_gb = info.total_memory_gb * 0.70
# Apple Silicon shares memory — GPU memory = total memory
info.gpu_memory_gb = info.total_memory_gb
# Detect CPU cores
result = subprocess.run(
["sysctl", "-n", "hw.ncpu"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.cpu_cores = int(result.stdout.strip())
except Exception as e:
logger.warning(f"Apple Silicon detection failed: {e}")
# Fallback
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_linux() -> HardwareInfo:
"""Detect Linux system with optional NVIDIA GPU."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
detection_method="proc",
)
try:
# Read /proc/meminfo
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
for line in meminfo.split("\n"):
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
info.total_memory_gb = kb / (1024 * 1024)
elif line.startswith("MemAvailable:"):
kb = int(line.split()[1])
info.available_memory_gb = kb / (1024 * 1024)
# CPU cores
info.cpu_cores = os.cpu_count() or 1
# Check for NVIDIA GPU
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
"--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split("\n")
if lines:
parts = lines[0].split(", ")
if len(parts) >= 3:
info.gpu_name = parts[0].strip()
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
gpu_free = float(parts[2]) / 1024
# Use GPU free for VRAM-based selection
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
info.detection_method = "nvidia-smi"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass # No NVIDIA GPU
except Exception as e:
logger.warning(f"Linux detection failed: {e}")
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_generic(system: str) -> HardwareInfo:
"""Fallback detection for unknown systems."""
import psutil
mem = psutil.virtual_memory()
return HardwareInfo(
total_memory_gb=mem.total / (1024**3),
available_memory_gb=mem.available / (1024**3),
cpu_cores=os.cpu_count() or 1,
detection_method="psutil",
)
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
def estimate_kv_cache_gb(
context_length: int,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
bits_per_channel: float = 3.5,
) -> float:
"""Estimate KV cache memory for given parameters.
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
"""
bytes_per_element = bits_per_channel / 8.0
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
return total_bytes / (1024**3)
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
"""Estimate model weights memory. Returns loaded size in GB.
This is a rough estimate — actual depends on exact quant format.
"""
# Common quant ratios (vs fp16)
quant_multipliers = {
"f16": 1.0,
"q8_0": 0.5,
"q6_k": 0.42,
"q5_k_m": 0.37,
"q4_k_m": 0.32,
"q3_k_m": 0.27,
"q2_k": 0.22,
}
# model_size_gb is already quantized size
return model_size_gb
# ── Selection Logic ───────────────────────────────────────────────────────────
@dataclass
class QuantSelection:
"""Result of quantization level selection."""
level: QuantLevel
hardware: HardwareInfo
reasoning: str
total_required_gb: float
available_gb: float
headroom_gb: float
env_vars: dict = field(default_factory=dict)
server_flags: dict = field(default_factory=dict)
warnings: list = field(default_factory=list)
def select_quant_level(
model_size_gb: float = 14.0,
context_length: int = 32768,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
preferred_level: Optional[str] = None,
force_cpu: bool = False,
) -> QuantSelection:
"""Select the best quantization level for available hardware.
Args:
model_size_gb: Size of the model weights in GB
context_length: Target context length
num_layers: Number of transformer layers
num_kv_heads: Number of KV attention heads
head_dim: Dimension per attention head
preferred_level: Force a specific level (still checks if it fits)
force_cpu: If True, ignore GPU memory
Returns:
QuantSelection with the chosen level and reasoning
"""
hw = detect_hardware()
if force_cpu:
hw.gpu_memory_gb = None
hw.gpu_name = None
# Use the most restrictive memory constraint
# For Apple Silicon: unified memory, use total
# For NVIDIA: use GPU VRAM
# For CPU-only: use system RAM
if hw.gpu_memory_gb and hw.gpu_name:
memory_pool_gb = hw.gpu_memory_gb
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
elif hw.is_apple_silicon:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
else:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
model_mem = estimate_model_memory_gb(model_size_gb)
# Try levels from best to most compressed
chosen = None
for level in QUANT_LEVELS:
if preferred_level and level.name != preferred_level:
continue
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
level.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
if headroom >= level.min_memory_headroom_gb:
chosen = level
break
if preferred_level and level.name == preferred_level:
# User forced this level but it doesn't fit
chosen = level
break
if chosen is None:
# Nothing fits — pick the most aggressive compression, not the q4_0 fallback.
chosen = max(QUANT_LEVELS, key=lambda level: level.compression_ratio)
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
# Calculate final numbers
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
chosen.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
# Build reasoning
reasoning_parts = [
f"{memory_label}:",
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
f"{chosen.compression_ratio:.1f}x compression)",
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
f"({headroom:.1f}GB headroom)"
]
reasoning = " ".join(reasoning_parts)
# Build environment variables for llama.cpp
env_vars = {
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
}
# Build server flags
server_flags = {
"-ctk": chosen.kv_type,
"-ctv": chosen.kv_type,
"-c": str(context_length),
}
# Warnings
warnings = []
if headroom < 2.0:
warnings.append(
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
)
if headroom < 0:
warnings.append(
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
f"Inference may fail or swap heavily."
)
selection = QuantSelection(
level=chosen,
hardware=hw,
reasoning=reasoning,
total_required_gb=total_required,
available_gb=memory_pool_gb,
headroom_gb=headroom,
env_vars=env_vars,
server_flags=server_flags,
warnings=warnings,
)
logger.info(f"Quant selection: {reasoning}")
for w in warnings:
logger.warning(w)
return selection
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
"""CLI entry point for quant level selection."""
import argparse
import json
parser = argparse.ArgumentParser(
description="Auto-select TurboQuant compression level based on available hardware"
)
parser.add_argument("--model-size", type=float, default=14.0,
help="Model size in GB (default: 14.0)")
parser.add_argument("--context", type=int, default=32768,
help="Target context length (default: 32768)")
parser.add_argument("--layers", type=int, default=48,
help="Number of transformer layers (default: 48)")
parser.add_argument("--kv-heads", type=int, default=8,
help="Number of KV attention heads (default: 8)")
parser.add_argument("--head-dim", type=int, default=128,
help="Dimension per attention head (default: 128)")
parser.add_argument("--prefer", type=str, default=None,
choices=[l.name for l in QUANT_LEVELS],
help="Prefer a specific quant level")
parser.add_argument("--force-cpu", action="store_true",
help="Ignore GPU, use CPU memory only")
parser.add_argument("--json", action="store_true",
help="JSON output for automation")
parser.add_argument("--detect-only", action="store_true",
help="Only detect hardware, don't select")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(message)s")
if args.detect_only:
hw = detect_hardware()
if args.json:
print(json.dumps(hw.__dict__, default=str, indent=2))
else:
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
print(f"Available: {hw.available_memory_gb:.1f} GB")
if hw.gpu_memory_gb:
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
if hw.gpu_name:
print(f"GPU: {hw.gpu_name}")
if hw.is_apple_silicon:
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
print(f"CPU cores: {hw.cpu_cores}")
print(f"Detection: {hw.detection_method}")
return
selection = select_quant_level(
model_size_gb=args.model_size,
context_length=args.context,
num_layers=args.layers,
num_kv_heads=args.kv_heads,
head_dim=args.head_dim,
preferred_level=args.prefer,
force_cpu=args.force_cpu,
)
if args.json:
result = {
"level": selection.level.name,
"bits_per_channel": selection.level.bits_per_channel,
"compression_ratio": selection.level.compression_ratio,
"quality": selection.level.quality_label,
"reasoning": selection.reasoning,
"total_required_gb": round(selection.total_required_gb, 2),
"available_gb": round(selection.available_gb, 1),
"headroom_gb": round(selection.headroom_gb, 2),
"env_vars": selection.env_vars,
"server_flags": selection.server_flags,
"warnings": selection.warnings,
"hardware": {
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
"gpu_name": selection.hardware.gpu_name,
"is_apple_silicon": selection.hardware.is_apple_silicon,
"chip_name": selection.hardware.chip_name,
"cpu_cores": selection.hardware.cpu_cores,
},
}
print(json.dumps(result, indent=2))
else:
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
print(f" {selection.reasoning}")
print()
print(f"Environment variables:")
for k, v in selection.env_vars.items():
print(f" export {k}={v}")
print()
print(f"Server flags:")
for k, v in selection.server_flags.items():
print(f" {k} {v}")
if selection.warnings:
print()
for w in selection.warnings:
print(f" WARNING: {w}")
if __name__ == "__main__":
main()

View File

@@ -1,241 +0,0 @@
// QJL (Quantized Johnson-Lindenstrauss) Residual Correction — Metal Kernels
//
// These kernels implement the QJL stage of TurboQuant on Apple GPU.
// QJL corrects the quantization error from PolarQuant using 1-bit sign projections.
//
// Algorithm:
// Encode: residual = x - PolarQuant(x), then sign(R^T * residual) → 1 bit
// Decode: PolarQuant(x) + R * signs * scale → corrected reconstruction
#include <metal_stdlib>
using namespace metal;
// ── Constants ──────────────────────────────────────────────────────────
constant uint QJL_PROJ_DIM = 64;
constant uint QJL_PROJ_DIM_PACKED = 8; // 64 bits / 8 bits per byte
// ── QJL Projection Matrix ─────────────────────────────────────────────
// Pre-generated with seed 0xDEADBEEF for reproducibility
// This is a d x 64 matrix of ±1/sqrt(64) entries
// Stored in constant memory for fast broadcast access
//
// NOTE: In production, this would be generated at model load time
// and stored in a Metal buffer. This is the reference pattern.
// ── QJL Residual Encode Kernel ─────────────────────────────────────────
// Projects the residual vector onto the QJL space and packs sign bits.
//
// Inputs:
// residual [buffer(0)]: float array [d] — the quantization error
// proj_matrix [buffer(1)]: float array [d * 64] — JL projection matrix
//
// Output:
// signs_packed [buffer(2)]: uchar array [8] — packed 1-bit signs
//
// Dispatch: 1 threadgroup per vector
kernel void kernel_qjl_encode_residual(
device const float* residual [[buffer(0)]],
device const float* proj_matrix [[buffer(1)]],
device uchar* signs_packed [[buffer(2)]],
constant uint& d [[buffer(3)]],
uint tid [[thread_position_in_threadgroup]],
uint tpg [[threads_per_threadgroup]]
) {
const uint proj_dim = QJL_PROJ_DIM;
// Each thread handles a subset of projection dimensions
// Then we reduce and pack
threadgroup float projections[QJL_PROJ_DIM];
for (uint j = tid; j < proj_dim; j += tpg) {
float dot = 0.0f;
for (uint i = 0; i < d; i++) {
dot += residual[i] * proj_matrix[i * proj_dim + j];
}
projections[j] = dot;
}
threadgroup_barrier(mem_flags::mem_threadgroup);
// Thread 0 packs sign bits
if (tid == 0) {
uchar packed[QJL_PROJ_DIM_PACKED];
for (uint b = 0; b < QJL_PROJ_DIM_PACKED; b++) {
packed[b] = 0;
}
for (uint j = 0; j < proj_dim; j++) {
if (projections[j] >= 0.0f) {
packed[j / 8] |= (1u << (j % 8));
}
}
// Write output
for (uint b = 0; b < QJL_PROJ_DIM_PACKED; b++) {
signs_packed[b] = packed[b];
}
}
}
// ── QJL Residual Decode Kernel ─────────────────────────────────────────
// Unpacks sign bits and reconstructs correction vector in original space.
//
// Inputs:
// signs_packed [buffer(0)]: uchar array [8] — packed 1-bit signs
// proj_matrix [buffer(1)]: float array [d * 64] — JL projection matrix
//
// Output:
// correction [buffer(2)]: float array [d] — correction vector
//
// Dispatch: 1 threadgroup per vector, threads handle output dimensions
kernel void kernel_qjl_decode_residual(
device const uchar* signs_packed [[buffer(0)]],
device const float* proj_matrix [[buffer(1)]],
device float* correction [[buffer(2)]],
constant uint& d [[buffer(3)]],
uint tid [[thread_position_in_threadgroup]],
uint tpg [[threads_per_threadgroup]]
) {
const uint proj_dim = QJL_PROJ_DIM;
// Unpack sign bits to ±1
threadgroup float signs[QJL_PROJ_DIM];
if (tid == 0) {
for (uint j = 0; j < proj_dim; j++) {
bool positive = (signs_packed[j / 8] >> (j % 8)) & 1;
signs[j] = positive ? 1.0f : -1.0f;
}
}
threadgroup_barrier(mem_flags::mem_threadgroup);
// Each thread computes a subset of output dimensions
// correction[i] = sum_j proj_matrix[i*m + j] * signs[j]
for (uint i = tid; i < d; i += tpg) {
float sum = 0.0f;
for (uint j = 0; j < proj_dim; j++) {
sum += proj_matrix[i * proj_dim + j] * signs[j];
}
correction[i] = sum;
}
}
// ── Fused TurboQuant + QJL Dequant Kernel ──────────────────────────────
// Single-kernel dequantization: PolarQuant reconstruction + QJL correction.
// This is the attention hot path kernel.
//
// Inputs:
// polar_packed [buffer(0)]: uchar array [d/2] — 4-bit PolarQuant indices
// polar_norm [buffer(1)]: float — L2 norm (radius)
// qjl_signs [buffer(2)]: uchar array [8] — QJL packed sign bits
// proj_matrix [buffer(3)]: float array [d * 64] — JL projection matrix
//
// Output:
// dst [buffer(4)]: float array [d] — corrected reconstruction
//
// Dispatch: 1 thread per vector (same as kernel_turbo4_dequant)
kernel void kernel_turboquant_qjl_dequant(
device const uchar* polar_packed [[buffer(0)]],
device const float* polar_norm [[buffer(1)]],
device const uchar* qjl_signs [[buffer(2)]],
device const float* proj_matrix [[buffer(3)]],
device float* dst [[buffer(4)]],
constant uint& d [[buffer(5)]],
uint tid [[thread_position_in_grid]]
) {
const uint proj_dim = QJL_PROJ_DIM;
// Offset for this vector
uint base_polar = tid * (d / 2);
uint base_qjl = tid * QJL_PROJ_DIM_PACKED;
uint base_dst = tid * d;
float norm = polar_norm[tid];
// Step 1: PolarQuant decode (inline, same as kernel_turbo4_dequant)
// Reuse existing centroids from turbo4
constant float centroids[16] = {
-0.2154, -0.1523, -0.1121, -0.0812,
-0.0554, -0.0321, -0.0105, 0.0105,
0.0321, 0.0554, 0.0812, 0.1121,
0.1523, 0.2154, 0.2800, 0.3500
};
for (uint i = 0; i < d; i++) {
uchar packed = polar_packed[base_polar + (i / 2)];
uint idx = (i % 2 == 0) ? (packed & 0x0F) : (packed >> 4);
dst[base_dst + i] = centroids[idx] * norm;
}
// Step 2: Unpack QJL signs
float signs[QJL_PROJ_DIM];
for (uint j = 0; j < proj_dim; j++) {
bool positive = (qjl_signs[base_qjl + (j / 8)] >> (j % 8)) & 1;
signs[j] = positive ? 1.0f : -1.0f;
}
// Step 3: Add QJL correction
// correction_scale = norm / sqrt(d)
float correction_scale = norm / sqrt(float(d));
for (uint i = 0; i < d; i++) {
float correction = 0.0f;
for (uint j = 0; j < proj_dim; j++) {
correction += proj_matrix[i * proj_dim + j] * signs[j];
}
dst[base_dst + i] += correction * correction_scale;
}
// Note: In production, FWHT would be applied here or fused into attention
}
// ── Batch QJL Encode Kernel ────────────────────────────────────────────
// Processes multiple residual vectors in parallel.
// Used during KV cache writes (one vector per token per head).
//
// Inputs:
// residuals [buffer(0)]: float array [n_vectors * d]
// proj_matrix [buffer(1)]: float array [d * 64]
//
// Output:
// signs_packed [buffer(2)]: uchar array [n_vectors * 8]
//
// Dispatch: n_vectors threads (one per vector)
kernel void kernel_qjl_encode_batch(
device const float* residuals [[buffer(0)]],
device const float* proj_matrix [[buffer(1)]],
device uchar* signs_packed [[buffer(2)]],
constant uint& d [[buffer(3)]],
uint tid [[thread_position_in_grid]]
) {
const uint proj_dim = QJL_PROJ_DIM;
uint base_residual = tid * d;
uint base_signs = tid * QJL_PROJ_DIM_PACKED;
// Project and pack
uchar packed[QJL_PROJ_DIM_PACKED];
for (uint b = 0; b < QJL_PROJ_DIM_PACKED; b++) {
packed[b] = 0;
}
for (uint j = 0; j < proj_dim; j++) {
float dot = 0.0f;
for (uint i = 0; i < d; i++) {
dot += residuals[base_residual + i] * proj_matrix[i * proj_dim + j];
}
if (dot >= 0.0f) {
packed[j / 8] |= (1u << (j % 8));
}
}
// Write output
for (uint b = 0; b < QJL_PROJ_DIM_PACKED; b++) {
signs_packed[base_signs + b] = packed[b];
}
}

View File

@@ -1,167 +0,0 @@
#include "llama-turbo-qjl.h"
#include <cmath>
#include <cstdint>
#include <cstring>
#include <random>
#include <vector>
// ── QJL Projection Matrix ─────────────────────────────────────────────
static constexpr uint32_t QJL_MATRIX_SEED = 0xDEADBEEF;
static std::vector<float> g_proj_matrix;
static bool g_proj_initialized = false;
static void ensure_proj_matrix(int d) {
if (!g_proj_initialized || (int)g_proj_matrix.size() != d * QJL_PROJ_DIM) {
g_proj_matrix.resize(d * QJL_PROJ_DIM);
qjl_generate_projection_matrix(g_proj_matrix.data(), d, QJL_MATRIX_SEED);
g_proj_initialized = true;
}
}
void qjl_generate_projection_matrix(float* matrix, int d, uint32_t seed) {
std::mt19937 rng(seed);
std::uniform_int_distribution<int> coin(0, 1);
const float scale = 1.0f / std::sqrt((float)QJL_PROJ_DIM);
for (int i = 0; i < d * QJL_PROJ_DIM; i++) {
matrix[i] = (coin(rng) == 0 ? -1.0f : 1.0f) * scale;
}
}
// ── QJL Residual Encode ───────────────────────────────────────────────
float qjl_encode_residual(
const float* residual,
const float* proj_matrix,
uint8_t* signs_out,
int d
) {
// Step 1: Project residual onto JL space
float projections[QJL_PROJ_DIM];
for (int j = 0; j < QJL_PROJ_DIM; j++) {
float dot = 0.0f;
for (int i = 0; i < d; i++) {
dot += residual[i] * proj_matrix[i * QJL_PROJ_DIM + j];
}
projections[j] = dot;
}
// Step 2: Compute residual norm
float residual_norm = 0.0f;
for (int i = 0; i < d; i++) {
residual_norm += residual[i] * residual[i];
}
residual_norm = std::sqrt(residual_norm);
// Step 3: Compute scale factor
// For Rademacher matrix R with entries ±1/sqrt(m):
// E[R * sign(R^T * r)] = c * r_hat where c ≈ sqrt(2/pi) ≈ 0.798
// We want: scale * R * sign(R^T * r) ≈ r
// => scale ≈ ||r|| / c / sqrt(d) * sqrt(m) ... but R already has 1/sqrt(m)
//
// Actually, let's think empirically:
// R * sign(R^T * r) has norm approximately sqrt(d) * sqrt(2/pi)
// We want ||scale * R * sign(R^T * r)|| = ||r||
// => scale = ||r|| / (sqrt(d) * sqrt(2/pi)) = ||r|| * sqrt(pi/2) / sqrt(d)
constexpr float kSqrtPiOver2 = 1.25331413732f; // sqrt(pi/2)
float scale = residual_norm * kSqrtPiOver2 / std::sqrt((float)d);
// For very small residuals, just skip the correction
if (residual_norm < 1e-6f) {
scale = 0.0f;
}
// Step 4: Pack sign bits
std::memset(signs_out, 0, QJL_BYTES_PER_VECTOR);
for (int j = 0; j < QJL_PROJ_DIM; j++) {
if (projections[j] >= 0.0f) {
signs_out[j / 8] |= (1u << (j % 8));
}
}
return scale;
}
// ── QJL Residual Decode ───────────────────────────────────────────────
void qjl_decode_residual(
const uint8_t* signs_in,
const float* proj_matrix,
float scale,
float* correction_out,
int d
) {
if (scale < 1e-9f) {
std::memset(correction_out, 0, d * sizeof(float));
return;
}
// Unpack signs to ±scale
float signs[QJL_PROJ_DIM];
for (int j = 0; j < QJL_PROJ_DIM; j++) {
bool positive = (signs_in[j / 8] >> (j % 8)) & 1;
signs[j] = positive ? scale : -scale;
}
// Reconstruct: correction = R * signs
std::memset(correction_out, 0, d * sizeof(float));
for (int i = 0; i < d; i++) {
float sum = 0.0f;
for (int j = 0; j < QJL_PROJ_DIM; j++) {
sum += proj_matrix[i * QJL_PROJ_DIM + j] * signs[j];
}
correction_out[i] = sum;
}
}
// ── Full TurboQuant Encode ────────────────────────────────────────────
void turboquant_encode_qjl(
const float* src,
uint8_t* dst_polar,
float* norm,
uint8_t* dst_qjl,
float* qjl_scale,
int d
) {
// Step 1: PolarQuant encode
polar_quant_encode_turbo4(src, dst_polar, norm, d);
// Step 2: Compute residual
std::vector<float> reconstructed(d);
polar_quant_decode_turbo4(dst_polar, reconstructed.data(), *norm, d);
std::vector<float> residual(d);
for (int i = 0; i < d; i++) {
residual[i] = src[i] - reconstructed[i];
}
// Step 3: QJL encode residual
ensure_proj_matrix(d);
*qjl_scale = qjl_encode_residual(residual.data(), g_proj_matrix.data(), dst_qjl, d);
}
// ── Full TurboQuant Decode ────────────────────────────────────────────
void turboquant_decode_qjl(
const uint8_t* src_polar,
float norm,
const uint8_t* src_qjl,
float qjl_scale,
float* dst,
int d
) {
// Step 1: PolarQuant decode
polar_quant_decode_turbo4(src_polar, dst, norm, d);
// Step 2: QJL correction
std::vector<float> correction(d);
ensure_proj_matrix(d);
qjl_decode_residual(src_qjl, g_proj_matrix.data(), qjl_scale, correction.data(), d);
// Step 3: Add correction
for (int i = 0; i < d; i++) {
dst[i] += correction[i];
}
}

View File

@@ -1,91 +0,0 @@
#ifndef LLAMA_TURBO_QJL_H
#define LLAMA_TURBO_QJL_H
#include "llama-turbo.h"
#include <cstdint>
#ifdef __cplusplus
extern "C" {
#endif
// ── QJL Configuration ──────────────────────────────────────────────────
// QJL projection dimension (Johnson-Lindenstrauss bound)
// For d=128 input, m=64 projections preserves distances with high probability
constexpr int QJL_PROJ_DIM = 64;
// QJL sign bits per vector (1 bit per projection = m/8 bytes)
constexpr int QJL_BYTES_PER_VECTOR = QJL_PROJ_DIM / 8; // 8 bytes
// ── QJL Encode ─────────────────────────────────────────────────────────
// Full TurboQuant encode: PolarQuant + QJL residual correction
//
// dst_polar: packed 4-bit PolarQuant indices [d/2 bytes]
// norm: L2 norm (radius) from PolarQuant
// dst_qjl: packed 1-bit QJL sign array [QJL_BYTES_PER_VECTOR bytes]
// qjl_scale: output scalar for correction magnitude
// d: dimension (must be 128)
void turboquant_encode_qjl(
const float* src,
uint8_t* dst_polar,
float* norm,
uint8_t* dst_qjl,
float* qjl_scale,
int d
);
// ── QJL Decode ─────────────────────────────────────────────────────────
// Full TurboQuant decode: PolarQuant + QJL residual correction
//
// src_polar: packed 4-bit PolarQuant indices [d/2 bytes]
// norm: L2 norm (radius)
// src_qjl: packed 1-bit QJL sign array [QJL_BYTES_PER_VECTOR bytes]
// qjl_scale: scalar for correction magnitude (from encode)
// dst: output float array [d]
// d: dimension (must be 128)
void turboquant_decode_qjl(
const uint8_t* src_polar,
float norm,
const uint8_t* src_qjl,
float qjl_scale,
float* dst,
int d
);
// ── QJL Utilities ──────────────────────────────────────────────────────
// Generate deterministic QJL projection matrix (seed-based)
// Matrix is d x QJL_PROJ_DIM, stored in row-major order
// Uses a fixed seed for reproducibility across runs
void qjl_generate_projection_matrix(float* matrix, int d, uint32_t seed);
// Compute QJL residual correction (encode side)
// residual: the difference x - PolarQuant(x) [d floats]
// signs_out: packed 1-bit signs [QJL_BYTES_PER_VECTOR bytes]
// Returns: average absolute projection value (for scaling)
float qjl_encode_residual(
const float* residual,
const float* proj_matrix,
uint8_t* signs_out,
int d
);
// Decode QJL residual correction (decode side)
// signs_in: packed 1-bit signs [QJL_BYTES_PER_VECTOR bytes]
// scale: correction magnitude scalar
// correction_out: output correction vector [d floats]
void qjl_decode_residual(
const uint8_t* signs_in,
const float* proj_matrix,
float scale,
float* correction_out,
int d
);
#ifdef __cplusplus
}
#endif
#endif // LLAMA_TURBO_QJL_H

3
tests/conftest.py Normal file
View File

@@ -0,0 +1,3 @@
"""Pytest configuration for turboquant."""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

View File

@@ -1,352 +0,0 @@
#include "llama-turbo-qjl.h"
#include <cmath>
#include <cstdint>
#include <iostream>
#include <random>
#include <string>
#include <vector>
#include <algorithm>
#include <numeric>
// ── Accuracy Gates (Issue #66) ─────────────────────────────────────────
//
// Target: perplexity delta < 0.1% vs f16
// Proxy: cosine similarity > 0.995 on random vectors
// max absolute error < 0.02
// mean absolute error < 0.005
//
namespace {
constexpr int kDim = 128;
constexpr float kCosineThreshold = 0.95f; // 1-bit QJL direction preservation
constexpr float kMaxAbsErrorThreshold = 0.8f; // Absolute error bound (1-bit has larger errors)
constexpr float kMeanAbsErrorThreshold = 0.2f; // Average error bound
constexpr float kZeroTolerance = 1.0e-6f;
// ── Helpers ────────────────────────────────────────────────────────────
[[nodiscard]] bool all_finite(const std::vector<float>& values) {
for (float v : values) {
if (!std::isfinite(v)) return false;
}
return true;
}
[[nodiscard]] float max_abs(const std::vector<float>& values) {
float best = 0.0f;
for (float v : values) best = std::max(best, std::fabs(v));
return best;
}
[[nodiscard]] float cosine_similarity(const std::vector<float>& a, const std::vector<float>& b) {
float dot = 0.0f, norm_a = 0.0f, norm_b = 0.0f;
for (int i = 0; i < kDim; i++) {
dot += a[i] * b[i];
norm_a += a[i] * a[i];
norm_b += b[i] * b[i];
}
float denom = std::sqrt(norm_a) * std::sqrt(norm_b);
return denom == 0.0f ? 1.0f : dot / denom;
}
[[nodiscard]] float max_absolute_error(const std::vector<float>& original,
const std::vector<float>& reconstructed) {
float worst = 0.0f;
for (int i = 0; i < kDim; i++) {
worst = std::max(worst, std::fabs(original[i] - reconstructed[i]));
}
return worst;
}
[[nodiscard]] float mean_absolute_error(const std::vector<float>& original,
const std::vector<float>& reconstructed) {
float sum = 0.0f;
for (int i = 0; i < kDim; i++) {
sum += std::fabs(original[i] - reconstructed[i]);
}
return sum / kDim;
}
[[nodiscard]] float roundtrip_error_reduction(
const std::vector<float>& input,
const std::vector<float>& polar_only,
const std::vector<float>& with_qjl
) {
float polar_mae = mean_absolute_error(input, polar_only);
float qjl_mae = mean_absolute_error(input, with_qjl);
if (polar_mae < 1e-9f) return 0.0f;
return (polar_mae - qjl_mae) / polar_mae;
}
void require(bool condition, const std::string& message) {
if (!condition) throw std::runtime_error(message);
}
void require_threshold(float value, float threshold, const std::string& name, bool less_than = true) {
if (less_than) {
require(value <= threshold,
name + " " + std::to_string(value) + " exceeds threshold " + std::to_string(threshold));
} else {
require(value >= threshold,
name + " " + std::to_string(value) + " below threshold " + std::to_string(threshold));
}
}
// ── Roundtrip Helpers ──────────────────────────────────────────────────
std::vector<float> roundtrip_polar_only(const std::vector<float>& input, float& norm_out) {
std::vector<uint8_t> packed(kDim / 2, 0);
norm_out = -1.0f;
polar_quant_encode_turbo4(input.data(), packed.data(), &norm_out, kDim);
std::vector<float> decoded(kDim, 0.0f);
polar_quant_decode_turbo4(packed.data(), decoded.data(), norm_out, kDim);
return decoded;
}
std::vector<float> roundtrip_qjl(const std::vector<float>& input, float& norm_out) {
std::vector<uint8_t> polar_packed(kDim / 2, 0);
std::vector<uint8_t> qjl_signs(QJL_BYTES_PER_VECTOR, 0);
float qjl_scale = 0.0f;
norm_out = -1.0f;
turboquant_encode_qjl(input.data(), polar_packed.data(), &norm_out,
qjl_signs.data(), &qjl_scale, kDim);
std::vector<float> decoded(kDim, 0.0f);
turboquant_decode_qjl(polar_packed.data(), norm_out,
qjl_signs.data(), qjl_scale, decoded.data(), kDim);
return decoded;
}
// ── Test Cases ─────────────────────────────────────────────────────────
void test_qjl_zero_vector() {
std::vector<float> zeros(kDim, 0.0f);
float norm = -1.0f;
auto decoded = roundtrip_qjl(zeros, norm);
require(norm == 0.0f, "zero vector should have zero norm");
require(all_finite(decoded), "zero vector decode produced non-finite values");
require(max_abs(decoded) <= kZeroTolerance, "zero vector decode should remain near zero");
}
void test_qjl_improves_over_polar_alone() {
std::mt19937 rng(42);
std::normal_distribution<float> dist(0.0f, 1.0f);
int num_tests = 100;
int improvements = 0;
float total_reduction = 0.0f;
for (int t = 0; t < num_tests; t++) {
std::vector<float> input(kDim);
for (float& v : input) v = dist(rng);
float norm_polar, norm_qjl;
auto polar_decoded = roundtrip_polar_only(input, norm_polar);
auto qjl_decoded = roundtrip_qjl(input, norm_qjl);
float polar_mae = mean_absolute_error(input, polar_decoded);
float qjl_mae = mean_absolute_error(input, qjl_decoded);
if (qjl_mae < polar_mae) improvements++;
total_reduction += roundtrip_error_reduction(input, polar_decoded, qjl_decoded);
}
float avg_reduction = total_reduction / num_tests;
std::cout << " QJL improves on PolarQuant in " << improvements << "/" << num_tests
<< " cases, avg error reduction: " << (avg_reduction * 100) << "%\n";
// Note: 1-bit QJL doesn't always improve on random vectors —
// it helps most when residual has directional structure.
// Real benefit shows in perplexity (attention scores), not per-vector MAE.
require(improvements >= 10 || avg_reduction > -0.5f,
"QJL should not significantly degrade quality: " +
std::to_string(improvements) + "/" + std::to_string(num_tests) +
" improvements, avg reduction: " + std::to_string(avg_reduction * 100) + "%");
}
void test_qjl_cosine_similarity_gate() {
std::mt19937 rng(12345);
std::normal_distribution<float> dist(0.0f, 1.0f);
float min_cosine = 1.0f;
float worst_cosine_polar = 1.0f;
for (int t = 0; t < 200; t++) {
std::vector<float> input(kDim);
for (float& v : input) v = dist(rng);
float norm;
auto decoded = roundtrip_qjl(input, norm);
float cos = cosine_similarity(input, decoded);
min_cosine = std::min(min_cosine, cos);
float norm_polar;
auto polar_decoded = roundtrip_polar_only(input, norm_polar);
float cos_polar = cosine_similarity(input, polar_decoded);
worst_cosine_polar = std::min(worst_cosine_polar, cos_polar);
}
std::cout << " QJL min cosine: " << min_cosine
<< " (PolarQuant-only: " << worst_cosine_polar << ")\n";
require_threshold(min_cosine, kCosineThreshold, "cosine similarity", false);
}
void test_qjl_error_bounds_gate() {
std::mt19937 rng(54321);
std::normal_distribution<float> dist(0.0f, 1.0f);
float worst_max_err = 0.0f;
float worst_mean_err = 0.0f;
for (int t = 0; t < 200; t++) {
std::vector<float> input(kDim);
for (float& v : input) v = dist(rng);
float norm;
auto decoded = roundtrip_qjl(input, norm);
float max_err = max_absolute_error(input, decoded);
float mean_err = mean_absolute_error(input, decoded);
worst_max_err = std::max(worst_max_err, max_err);
worst_mean_err = std::max(worst_mean_err, mean_err);
}
std::cout << " Max abs error: " << worst_max_err << " (threshold: " << kMaxAbsErrorThreshold << ")\n";
std::cout << " Mean abs error: " << worst_mean_err << " (threshold: " << kMeanAbsErrorThreshold << ")\n";
require_threshold(worst_max_err, kMaxAbsErrorThreshold, "max absolute error");
require_threshold(worst_mean_err, kMeanAbsErrorThreshold, "mean absolute error");
}
void test_qjl_deterministic() {
std::mt19937 rng(99);
std::normal_distribution<float> dist(0.0f, 1.0f);
std::vector<float> input(kDim);
for (float& v : input) v = dist(rng);
std::vector<uint8_t> polar1(kDim / 2), polar2(kDim / 2);
std::vector<uint8_t> qjl1(QJL_BYTES_PER_VECTOR), qjl2(QJL_BYTES_PER_VECTOR);
float norm1, norm2, scale1, scale2;
turboquant_encode_qjl(input.data(), polar1.data(), &norm1, qjl1.data(), &scale1, kDim);
turboquant_encode_qjl(input.data(), polar2.data(), &norm2, qjl2.data(), &scale2, kDim);
require(norm1 == norm2, "norm should be deterministic");
require(scale1 == scale2, "qjl_scale should be deterministic");
require(polar1 == polar2, "polar quant should be deterministic");
require(qjl1 == qjl2, "QJL signs should be deterministic");
}
void test_qjl_projection_matrix_properties() {
std::vector<float> matrix(kDim * QJL_PROJ_DIM);
qjl_generate_projection_matrix(matrix.data(), kDim, 0xDEADBEEF);
int pos_count = 0, neg_count = 0;
for (int i = 0; i < kDim * QJL_PROJ_DIM; i++) {
if (matrix[i] > 0) pos_count++;
else neg_count++;
}
float pos_ratio = (float)pos_count / (kDim * QJL_PROJ_DIM);
std::cout << " Projection matrix +1 ratio: " << pos_ratio << "\n";
require(pos_ratio > 0.40f && pos_ratio < 0.60f,
"projection matrix should be roughly balanced ±1");
float expected_scale = 1.0f / std::sqrt((float)QJL_PROJ_DIM);
float actual_scale = std::fabs(matrix[0]);
require(std::fabs(actual_scale - expected_scale) < 0.001f,
"projection matrix scaling should be 1/sqrt(m)");
}
void test_qjl_compression_ratio() {
int polar_bytes = kDim / 2; // 64 bytes
int qjl_bytes = QJL_BYTES_PER_VECTOR + 4; // 8 bytes signs + 4 bytes scale = 12
int total_bytes = polar_bytes + qjl_bytes; // 76 bytes
int fp32_bytes = kDim * 4; // 512 bytes
int fp16_bytes = kDim * 2; // 256 bytes
float compression_vs_fp32 = (float)fp32_bytes / total_bytes;
float compression_vs_fp16 = (float)fp16_bytes / total_bytes;
std::cout << " Storage: " << total_bytes << " bytes/vector "
<< "(" << compression_vs_fp32 << "x vs FP32, "
<< compression_vs_fp16 << "x vs FP16)\n";
require(total_bytes == 76, "total storage should be 76 bytes per vector");
require(compression_vs_fp32 > 6.0f, "compression ratio vs FP32 should be > 6x");
}
void test_qjl_encode_decode_roundtrip() {
std::mt19937 rng(777);
std::normal_distribution<float> dist(0.0f, 0.1f);
std::vector<float> matrix(kDim * QJL_PROJ_DIM);
qjl_generate_projection_matrix(matrix.data(), kDim, 0xDEADBEEF);
for (int t = 0; t < 50; t++) {
std::vector<float> residual(kDim);
for (float& v : residual) v = dist(rng);
std::vector<uint8_t> signs(QJL_BYTES_PER_VECTOR, 0);
float scale = qjl_encode_residual(residual.data(), matrix.data(), signs.data(), kDim);
std::vector<float> decoded(kDim, 0.0f);
qjl_decode_residual(signs.data(), matrix.data(), scale, decoded.data(), kDim);
float cos = cosine_similarity(residual, decoded);
// 1-bit QJL preserves direction reasonably well
require(cos > 0.3f || scale < 1e-6f,
"QJL decode should preserve direction (cosine > 0.3)");
}
}
} // namespace
// ── Main ───────────────────────────────────────────────────────────────
int main() {
struct TestCase {
const char* name;
void (*fn)();
};
TestCase tests[] = {
{"QJL zero vector", test_qjl_zero_vector},
{"QJL improves over PolarQuant", test_qjl_improves_over_polar_alone},
{"QJL cosine similarity gate", test_qjl_cosine_similarity_gate},
{"QJL error bounds gate", test_qjl_error_bounds_gate},
{"QJL deterministic", test_qjl_deterministic},
{"QJL projection matrix props", test_qjl_projection_matrix_properties},
{"QJL compression ratio", test_qjl_compression_ratio},
{"QJL encode/decode roundtrip", test_qjl_encode_decode_roundtrip},
};
int passed = 0, failed = 0;
std::cout << "QJL Accuracy Gate Tests (Issue #66)\n";
std::cout << "====================================\n\n";
for (auto& tc : tests) {
std::cout << "[" << (passed + failed + 1) << "] " << tc.name << " ... ";
try {
tc.fn();
std::cout << "PASS\n";
passed++;
} catch (const std::exception& e) {
std::cout << "FAIL: " << e.what() << "\n";
failed++;
}
}
std::cout << "\n====================================\n";
std::cout << "Results: " << passed << " passed, " << failed << " failed\n";
return failed > 0 ? 1 : 0;
}

View File

@@ -0,0 +1,58 @@
#!/usr/bin/env python3
"""Tests for Apple Silicon DFlash benchmark planning helpers (issue #152)."""
import os
import sys
from unittest.mock import patch
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from benchmarks.dflash_apple_silicon import ( # noqa: E402
build_mlx_benchmark_command,
detect_total_memory_gb,
render_report_template,
select_pair,
)
class TestPairSelection:
def test_prefers_qwen35_9b_on_36gb_mac(self):
pair = select_pair(total_memory_gb=36)
assert pair.slug == "qwen35-9b"
assert pair.base_model == "Qwen/Qwen3.5-9B"
assert pair.draft_model == "z-lab/Qwen3.5-9B-DFlash"
def test_falls_back_to_4b_when_memory_is_tight(self):
pair = select_pair(total_memory_gb=20)
assert pair.slug == "qwen35-4b"
assert pair.base_model == "Qwen/Qwen3.5-4B"
class TestCommandGeneration:
def test_builds_upstream_mlx_benchmark_command(self):
pair = select_pair(total_memory_gb=36)
command = build_mlx_benchmark_command(pair, dataset="gsm8k", max_samples=64)
assert "python -m dflash.benchmark --backend mlx" in command
assert "--model Qwen/Qwen3.5-9B" in command
assert "--draft-model z-lab/Qwen3.5-9B-DFlash" in command
assert "--dataset gsm8k" in command
assert "--max-samples 64" in command
assert "--draft-sliding-window-size 4096" in command
class TestReportTemplate:
def test_report_template_mentions_baseline_and_verdict(self):
pair = select_pair(total_memory_gb=36)
report = render_report_template(machine_label="M3 Max 36GB", pair=pair)
assert "DFlash Apple Silicon Benchmark Report" in report
assert "M3 Max 36GB" in report
assert "Qwen/Qwen3.5-9B" in report
assert "plain MLX or llama.cpp speculative decoding" in report
assert "Worth operationalizing locally?" in report
class TestMemoryDetection:
@patch("benchmarks.dflash_apple_silicon.platform.system", return_value="Darwin")
@patch("benchmarks.dflash_apple_silicon.subprocess.check_output", return_value=b"38654705664\n")
def test_detect_total_memory_gb_on_macos(self, _mock_sysctl, _mock_system):
assert detect_total_memory_gb() == 36.0

View File

@@ -0,0 +1,177 @@
#!/usr/bin/env python3
"""Tests for quant_selector.py"""
import sys
import os
import pytest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution.quant_selector import (
QuantLevel,
HardwareInfo,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
class TestQuantLevels:
def test_levels_keep_turboquant_quality_order_with_q4_fallback_last(self):
"""TurboQuant levels should lead, with q4_0 reserved as the non-Turbo fallback."""
names = [level.name for level in QUANT_LEVELS]
assert names[:3] == ["turbo4", "turbo3", "turbo2"]
assert names[-1] == "q4_0"
def test_all_levels_have_required_fields(self):
for level in QUANT_LEVELS:
assert level.name
assert level.bits_per_channel > 0
assert level.compression_ratio > 1
assert level.quality_label
assert level.layer_adaptive >= 0
assert level.kv_type
class TestKVEstimate:
def test_basic_estimate(self):
# 48 layers, 8 heads, 128 dim, 32K context, 3.5 bits
kv_gb = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
assert kv_gb > 0
assert kv_gb < 10 # Should be reasonable
def test_longer_context_larger(self):
kv_32k = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
kv_128k = estimate_kv_cache_gb(131072, 48, 8, 128, 3.5)
assert kv_128k > kv_32k
def test_higher_bits_larger(self):
kv_4b = estimate_kv_cache_gb(32768, 48, 8, 128, 4.0)
kv_2b = estimate_kv_cache_gb(32768, 48, 8, 128, 2.0)
assert kv_4b > kv_2b
class TestHardwareDetection:
def test_detect_returns_info(self):
hw = detect_hardware()
assert hw.total_memory_gb > 0
assert hw.available_memory_gb > 0
assert hw.detection_method
@patch("evolution.quant_selector.platform.system", return_value="Linux")
@patch("builtins.open", create=True)
def test_linux_detection(self, mock_open, mock_system):
mock_open.return_value.__enter__().read.return_value = (
"MemTotal: 32000000 kB\n"
"MemAvailable: 24000000 kB\n"
)
hw = _detect_linux_fallback()
assert hw.total_memory_gb > 20
def _detect_linux_fallback():
"""Helper to test Linux detection with mocked /proc/meminfo."""
from evolution.quant_selector import _detect_linux
return _detect_linux()
class TestSelection:
def test_selects_turbo4_for_large_memory(self):
"""With plenty of memory, should pick turbo4 (best quality)."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
gpu_memory_gb=64,
gpu_name="Test GPU",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert sel.level.name == "turbo4"
assert sel.headroom_gb > 0
def test_selects_smaller_for_tight_memory(self):
"""With tight memory, should pick a smaller quant."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=16,
available_memory_gb=12,
gpu_memory_gb=16,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=131072)
# Should pick a smaller quant for 128K context on 16GB
assert sel.level.bits_per_channel <= 4.0
def test_preferred_level(self):
"""User can force a specific level."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(
model_size_gb=14.0, context_length=32768,
preferred_level="turbo2"
)
assert sel.level.name == "turbo2"
def test_env_vars_populated(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "TURBO_LAYER_ADAPTIVE" in sel.env_vars
assert "-ctk" in sel.server_flags
assert "-ctv" in sel.server_flags
def test_warnings_on_low_headroom(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=18,
available_memory_gb=14,
gpu_memory_gb=18,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=16.0, context_length=65536)
assert len(sel.warnings) > 0
def test_falls_back_to_turbo2_when_nothing_fits(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=8,
available_memory_gb=6,
gpu_memory_gb=8,
gpu_name="Tiny GPU",
cpu_cores=4,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=16.0, context_length=131072)
assert sel.level.name == "turbo2"
def test_reasoning_contains_key_info(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=32,
available_memory_gb=24,
is_apple_silicon=True,
chip_name="M4 Max",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "turbo4" in sel.reasoning
assert "M4 Max" in sel.reasoning or "32GB" in sel.reasoning

View File

@@ -0,0 +1,338 @@
"""
Integration test: turboquant compressed model passes hermes tool calls (issue #82).
Validates that a TurboQuant-compressed model can:
1. Parse hermes tool schemas correctly
2. Format tool calls in OpenAI-compatible format
3. Pass through the hermes agent conversation loop
Tests are structured as contract tests -- they validate the schema/format
compatibility without requiring a running model server. The live inference
test is skipped by default (requires llama-server with TurboQuant model).
Usage:
pytest tests/test_tool_call_integration.py -v
pytest tests/test_tool_call_integration.py -v -k live # run live test if server available
"""
import json
import os
import pathlib
import re
import unittest
import pytest
ROOT = pathlib.Path(__file__).resolve().parents[1]
PROFILE_PATH = ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml"
BENCHMARKS_DIR = ROOT / "benchmarks"
class TestHermesProfileSchema(unittest.TestCase):
"""Validate the hermes profile YAML has required fields for tool calling."""
@classmethod
def setUpClass(cls):
import yaml
cls.profile = yaml.safe_load(PROFILE_PATH.read_text())
def test_profile_has_providers(self):
assert "providers" in self.profile, "Profile must define providers"
assert "primary" in self.profile["providers"], "Must have primary provider"
def test_primary_provider_has_endpoint(self):
primary = self.profile["providers"]["primary"]
assert "endpoint" in primary, "Primary provider must have endpoint"
assert primary["endpoint"].startswith("http"), "Endpoint must be HTTP(S) URL"
def test_primary_provider_has_api_path(self):
primary = self.profile["providers"]["primary"]
assert "api_path" in primary, "Primary provider must have api_path"
assert "/chat/completions" in primary["api_path"], (
"api_path should be OpenAI-compatible /chat/completions"
)
def test_turboquant_settings_present(self):
primary = self.profile["providers"]["primary"]
assert "turboquant" in primary, "Must have turboquant config section"
tq = primary["turboquant"]
assert tq.get("enabled") is True, "TurboQuant must be enabled"
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4"), (
"kv_type must be turbo2, turbo3, or turbo4"
)
def test_context_window_configured(self):
primary = self.profile["providers"]["primary"]
assert "context" in primary, "Must have context config"
ctx = primary["context"]
assert ctx.get("max_tokens", 0) >= 8192, (
"max_tokens should be >= 8192 for TurboQuant value proposition"
)
class TestToolSchemaCompatibility(unittest.TestCase):
"""Verify hermes tool schemas serialize to valid JSON for OpenAI tool_calls."""
SAMPLE_TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a text file with line numbers.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"offset": {"type": "integer", "default": 1},
"limit": {"type": "integer", "default": 500},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run a Python script.",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code"},
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
},
]
def test_tool_schemas_serialize_to_json(self):
"""Tool schemas must serialize without errors."""
serialized = json.dumps(self.SAMPLE_TOOL_SCHEMAS)
assert len(serialized) > 0
parsed = json.loads(serialized)
assert len(parsed) == len(self.SAMPLE_TOOL_SCHEMAS)
def test_tool_schemas_have_required_openai_fields(self):
"""Each tool schema must have the fields OpenAI expects."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
assert tool["type"] == "function", "Tool type must be 'function'"
fn = tool["function"]
assert "name" in fn, "Function must have name"
assert "description" in fn, "Function must have description"
assert "parameters" in fn, "Function must have parameters"
params = fn["parameters"]
assert params["type"] == "object", "Parameters type must be 'object'"
assert "properties" in params, "Parameters must have properties"
def test_tool_call_response_format(self):
"""Verify tool_call response matches OpenAI format."""
tool_call = {
"id": "call_abc123",
"type": "function",
"function": {
"name": "read_file",
"arguments": json.dumps({"path": "/tmp/test.txt"}),
},
}
args = json.loads(tool_call["function"]["arguments"])
assert args["path"] == "/tmp/test.txt"
assert tool_call["function"]["name"] in [
t["function"]["name"] for t in self.SAMPLE_TOOL_SCHEMAS
]
def test_tool_names_are_valid_identifiers(self):
"""Tool names must be valid Python identifiers for hermes dispatch."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
name = tool["function"]["name"]
assert re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name), (
f"Tool name \'{name}\' is not a valid identifier"
)
class TestTurboquantServerConfig(unittest.TestCase):
"""Validate server startup configuration matches hermes profile."""
def test_server_command_has_turboquant_flags(self):
"""The server command in the profile must include -ctk/-ctv flags."""
profile_text = PROFILE_PATH.read_text()
assert "-ctk" in profile_text, "Profile server command must include -ctk flag"
assert "-ctv" in profile_text, "Profile server command must include -ctv flag"
def test_server_command_has_context_flag(self):
"""Server command must set context size."""
profile_text = PROFILE_PATH.read_text()
assert re.search(r"-c\s+\d+", profile_text), (
"Server command must include -c <context_size> flag"
)
def test_layer_adaptive_env_var(self):
"""Profile must set TURBO_LAYER_ADAPTIVE env var."""
profile_text = PROFILE_PATH.read_text()
assert "TURBO_LAYER_ADAPTIVE" in profile_text, (
"Profile must configure TURBO_LAYER_ADAPTIVE"
)
class TestBenchmarkData(unittest.TestCase):
"""Validate benchmark test prompts include tool-call test cases."""
@classmethod
def setUpClass(cls):
prompts_path = BENCHMARKS_DIR / "test_prompts.json"
cls.prompts = json.loads(prompts_path.read_text())
def test_has_tool_call_test_prompt(self):
"""Benchmark prompts must include a tool-call format test."""
categories = [p.get("category") for p in self.prompts]
assert "tool_call_format" in categories, (
"Benchmark must include a tool_call_format test case"
)
def test_tool_call_prompt_expects_json(self):
"""Tool call test prompt must expect JSON in the response."""
tool_prompt = next(
p for p in self.prompts if p.get("category") == "tool_call_format"
)
pattern = tool_prompt.get("expected_pattern", "")
assert "json" in pattern.lower() or "\\{" in pattern, (
"Tool call prompt must expect JSON-formatted response"
)
@pytest.mark.skipif(
not os.environ.get("TURBOQUANT_SERVER_URL"),
reason="No TurboQuant server available (set TURBOQUANT_SERVER_URL to run)",
)
class TestLiveToolCallIntegration:
"""Live integration test -- requires running llama-server with TurboQuant."""
def test_server_health(self):
"""Server must respond to /v1/models endpoint."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
resp = requests.get(f"{url}/v1/models", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert "data" in data
assert len(data["data"]) > 0
def test_tool_call_completion(self):
"""Model must return a valid tool_call for a read_file prompt."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
}
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Read the file at /tmp/test.txt"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
choice = data["choices"][0]
msg = choice["message"]
if "tool_calls" in msg and msg["tool_calls"]:
tc = msg["tool_calls"][0]
assert tc["type"] == "function"
assert tc["function"]["name"] == "read_file"
args = json.loads(tc["function"]["arguments"])
assert "path" in args
else:
assert len(msg.get("content", "")) > 0
def test_tool_call_with_multiple_tools(self):
"""Model must handle multiple available tools."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run Python code",
"parameters": {
"type": "object",
"properties": {"code": {"type": "string"}},
"required": ["code"],
},
},
},
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Search the web for 'bitcoin price'"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
assert "choices" in data
assert len(data["choices"]) > 0
if __name__ == "__main__":
unittest.main()