Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
97240a4191 docs: Update stale forge URLs from IP to domain (closes #67)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 14s
Replace http://143.198.27.163:3000 with https://forge.alexanderwhitestone.com
in README.md and docs/PROJECT_STATUS.md.
2026-04-14 23:10:05 -04:00
13 changed files with 5 additions and 2089 deletions

3
.gitignore vendored
View File

@@ -1,3 +0,0 @@
build/
*.pyc
__pycache__/

View File

@@ -1,36 +0,0 @@
cmake_minimum_required(VERSION 3.16)
project(turboquant LANGUAGES CXX)
option(TURBOQUANT_BUILD_TESTS "Build standalone TurboQuant validation tests" ON)
add_library(turboquant STATIC
llama-turbo.cpp
)
target_include_directories(turboquant PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}
)
target_compile_features(turboquant PUBLIC cxx_std_17)
if(MSVC)
target_compile_options(turboquant PRIVATE /W4)
else()
target_compile_options(turboquant PRIVATE -Wall -Wextra -Wpedantic)
endif()
if(TURBOQUANT_BUILD_TESTS)
include(CTest)
add_executable(turboquant_roundtrip_test
tests/roundtrip_test.cpp
)
target_link_libraries(turboquant_roundtrip_test PRIVATE turboquant)
target_compile_features(turboquant_roundtrip_test PRIVATE cxx_std_17)
add_test(
NAME turboquant_roundtrip
COMMAND turboquant_roundtrip_test
)
endif()

View File

@@ -29,4 +29,4 @@ See [issues](https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant/i
- [rachittshah/mlx-turboquant](https://github.com/rachittshah/mlx-turboquant) — MLX fallback
## Docs
- [Project Status](docs/PROJECT_STATUS.md) — Full project status and build specification
- [BUILD-SPEC.md](BUILD-SPEC.md) — Full build specification (Strago, v2.2)

View File

@@ -385,7 +385,7 @@ Step 7: If pass → production. If fail → drop to turbo3 or adjust per-layer p
---
*Repo: http://143.198.27.163:3000/Timmy_Foundation/turboquant*
*Repo: https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant*
*Build: /tmp/llama-cpp-turboquant/build/bin/ (all binaries)*
*Branch: feature/turboquant-kv-cache*

View File

@@ -1,548 +0,0 @@
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
Detects hardware resources at startup and picks the highest quality
quantization level that fits within available memory. Supports Apple
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
Usage:
from evolution.quant_selector import select_quant_level
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
print(selection.level) # "turbo4"
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
"""
import logging
import os
import platform
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# ── Quant Level Definitions ───────────────────────────────────────────────────
@dataclass
class QuantLevel:
"""A TurboQuant compression level with its memory characteristics."""
name: str # e.g. "turbo4"
bits_per_channel: float # e.g. 3.5 for turbo4
compression_ratio: float # vs uncompressed KV cache
quality_label: str # "best", "high", "balanced", "fast"
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
kv_type: str # -ctk/-ctv flag value
min_memory_headroom_gb: float # Minimum free memory to recommend this level
description: str = ""
# Ordered from highest quality to most aggressive compression
QUANT_LEVELS = [
QuantLevel(
name="turbo4",
bits_per_channel=3.5,
compression_ratio=4.2,
quality_label="best",
layer_adaptive=7,
kv_type="turbo4",
min_memory_headroom_gb=4.0,
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
),
QuantLevel(
name="turbo3",
bits_per_channel=2.5,
compression_ratio=6.0,
quality_label="high",
layer_adaptive=5,
kv_type="turbo3",
min_memory_headroom_gb=3.0,
description="3-bit TurboQuant. High quality, ~6x KV compression."
),
QuantLevel(
name="turbo2",
bits_per_channel=1.5,
compression_ratio=10.0,
quality_label="balanced",
layer_adaptive=3,
kv_type="turbo2",
min_memory_headroom_gb=2.0,
description="2-bit TurboQuant. Balanced, ~10x KV compression."
),
QuantLevel(
name="q4_0",
bits_per_channel=4.0,
compression_ratio=3.5,
quality_label="fast",
layer_adaptive=0,
kv_type="q4_0",
min_memory_headroom_gb=1.5,
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
),
]
# ── Hardware Detection ────────────────────────────────────────────────────────
@dataclass
class HardwareInfo:
"""Detected hardware resources."""
total_memory_gb: float
available_memory_gb: float
gpu_memory_gb: Optional[float] = None
gpu_name: Optional[str] = None
is_apple_silicon: bool = False
chip_name: Optional[str] = None
cpu_cores: int = 0
detection_method: str = ""
def detect_hardware() -> HardwareInfo:
"""Detect available memory and GPU resources."""
system = platform.system()
if system == "Darwin":
return _detect_apple_silicon()
elif system == "Linux":
return _detect_linux()
else:
return _detect_generic(system)
def _detect_apple_silicon() -> HardwareInfo:
"""Detect Apple Silicon unified memory."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
is_apple_silicon=True,
detection_method="sysctl",
)
try:
# Get total memory
result = subprocess.run(
["sysctl", "-n", "hw.memsize"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
# Get chip name
result = subprocess.run(
["sysctl", "-n", "machdep.cpu.brand_string"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.chip_name = result.stdout.strip()
# Try to get GPU name (Apple Silicon)
result = subprocess.run(
["system_profiler", "SPDisplaysDataType"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "Chipset" in line or "GPU" in line:
info.gpu_name = line.split(":")[-1].strip()
break
# Estimate available memory (vm_stat)
result = subprocess.run(
["vm_stat"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
page_size = 4096 # macOS default
free_pages = 0
for line in result.stdout.split("\n"):
if "Pages free:" in line:
try:
free_pages = int(line.split(":")[-1].strip().rstrip("."))
except ValueError:
pass
# Available ≈ free + some speculative (conservative: just free)
info.available_memory_gb = (free_pages * page_size) / (1024**3)
# Fallback if vm_stat parsing failed
if info.available_memory_gb < 1:
# Conservative: 70% of total
info.available_memory_gb = info.total_memory_gb * 0.70
# Apple Silicon shares memory — GPU memory = total memory
info.gpu_memory_gb = info.total_memory_gb
# Detect CPU cores
result = subprocess.run(
["sysctl", "-n", "hw.ncpu"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.cpu_cores = int(result.stdout.strip())
except Exception as e:
logger.warning(f"Apple Silicon detection failed: {e}")
# Fallback
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_linux() -> HardwareInfo:
"""Detect Linux system with optional NVIDIA GPU."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
detection_method="proc",
)
try:
# Read /proc/meminfo
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
for line in meminfo.split("\n"):
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
info.total_memory_gb = kb / (1024 * 1024)
elif line.startswith("MemAvailable:"):
kb = int(line.split()[1])
info.available_memory_gb = kb / (1024 * 1024)
# CPU cores
info.cpu_cores = os.cpu_count() or 1
# Check for NVIDIA GPU
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
"--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split("\n")
if lines:
parts = lines[0].split(", ")
if len(parts) >= 3:
info.gpu_name = parts[0].strip()
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
gpu_free = float(parts[2]) / 1024
# Use GPU free for VRAM-based selection
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
info.detection_method = "nvidia-smi"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass # No NVIDIA GPU
except Exception as e:
logger.warning(f"Linux detection failed: {e}")
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_generic(system: str) -> HardwareInfo:
"""Fallback detection for unknown systems."""
import psutil
mem = psutil.virtual_memory()
return HardwareInfo(
total_memory_gb=mem.total / (1024**3),
available_memory_gb=mem.available / (1024**3),
cpu_cores=os.cpu_count() or 1,
detection_method="psutil",
)
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
def estimate_kv_cache_gb(
context_length: int,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
bits_per_channel: float = 3.5,
) -> float:
"""Estimate KV cache memory for given parameters.
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
"""
bytes_per_element = bits_per_channel / 8.0
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
return total_bytes / (1024**3)
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
"""Estimate model weights memory. Returns loaded size in GB.
This is a rough estimate — actual depends on exact quant format.
"""
# Common quant ratios (vs fp16)
quant_multipliers = {
"f16": 1.0,
"q8_0": 0.5,
"q6_k": 0.42,
"q5_k_m": 0.37,
"q4_k_m": 0.32,
"q3_k_m": 0.27,
"q2_k": 0.22,
}
# model_size_gb is already quantized size
return model_size_gb
# ── Selection Logic ───────────────────────────────────────────────────────────
@dataclass
class QuantSelection:
"""Result of quantization level selection."""
level: QuantLevel
hardware: HardwareInfo
reasoning: str
total_required_gb: float
available_gb: float
headroom_gb: float
env_vars: dict = field(default_factory=dict)
server_flags: dict = field(default_factory=dict)
warnings: list = field(default_factory=list)
def select_quant_level(
model_size_gb: float = 14.0,
context_length: int = 32768,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
preferred_level: Optional[str] = None,
force_cpu: bool = False,
) -> QuantSelection:
"""Select the best quantization level for available hardware.
Args:
model_size_gb: Size of the model weights in GB
context_length: Target context length
num_layers: Number of transformer layers
num_kv_heads: Number of KV attention heads
head_dim: Dimension per attention head
preferred_level: Force a specific level (still checks if it fits)
force_cpu: If True, ignore GPU memory
Returns:
QuantSelection with the chosen level and reasoning
"""
hw = detect_hardware()
if force_cpu:
hw.gpu_memory_gb = None
hw.gpu_name = None
# Use the most restrictive memory constraint
# For Apple Silicon: unified memory, use total
# For NVIDIA: use GPU VRAM
# For CPU-only: use system RAM
if hw.gpu_memory_gb and hw.gpu_name:
memory_pool_gb = hw.gpu_memory_gb
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
elif hw.is_apple_silicon:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
else:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
model_mem = estimate_model_memory_gb(model_size_gb)
# Try levels from best to most compressed
chosen = None
for level in QUANT_LEVELS:
if preferred_level and level.name != preferred_level:
continue
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
level.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
if headroom >= level.min_memory_headroom_gb:
chosen = level
break
if preferred_level and level.name == preferred_level:
# User forced this level but it doesn't fit
chosen = level
break
if chosen is None:
# Nothing fits — pick the most aggressive compression
chosen = QUANT_LEVELS[-1]
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
# Calculate final numbers
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
chosen.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
# Build reasoning
reasoning_parts = [
f"{memory_label}:",
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
f"{chosen.compression_ratio:.1f}x compression)",
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
f"({headroom:.1f}GB headroom)"
]
reasoning = " ".join(reasoning_parts)
# Build environment variables for llama.cpp
env_vars = {
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
}
# Build server flags
server_flags = {
"-ctk": chosen.kv_type,
"-ctv": chosen.kv_type,
"-c": str(context_length),
}
# Warnings
warnings = []
if headroom < 2.0:
warnings.append(
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
)
if headroom < 0:
warnings.append(
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
f"Inference may fail or swap heavily."
)
selection = QuantSelection(
level=chosen,
hardware=hw,
reasoning=reasoning,
total_required_gb=total_required,
available_gb=memory_pool_gb,
headroom_gb=headroom,
env_vars=env_vars,
server_flags=server_flags,
warnings=warnings,
)
logger.info(f"Quant selection: {reasoning}")
for w in warnings:
logger.warning(w)
return selection
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
"""CLI entry point for quant level selection."""
import argparse
import json
parser = argparse.ArgumentParser(
description="Auto-select TurboQuant compression level based on available hardware"
)
parser.add_argument("--model-size", type=float, default=14.0,
help="Model size in GB (default: 14.0)")
parser.add_argument("--context", type=int, default=32768,
help="Target context length (default: 32768)")
parser.add_argument("--layers", type=int, default=48,
help="Number of transformer layers (default: 48)")
parser.add_argument("--kv-heads", type=int, default=8,
help="Number of KV attention heads (default: 8)")
parser.add_argument("--head-dim", type=int, default=128,
help="Dimension per attention head (default: 128)")
parser.add_argument("--prefer", type=str, default=None,
choices=[l.name for l in QUANT_LEVELS],
help="Prefer a specific quant level")
parser.add_argument("--force-cpu", action="store_true",
help="Ignore GPU, use CPU memory only")
parser.add_argument("--json", action="store_true",
help="JSON output for automation")
parser.add_argument("--detect-only", action="store_true",
help="Only detect hardware, don't select")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(message)s")
if args.detect_only:
hw = detect_hardware()
if args.json:
print(json.dumps(hw.__dict__, default=str, indent=2))
else:
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
print(f"Available: {hw.available_memory_gb:.1f} GB")
if hw.gpu_memory_gb:
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
if hw.gpu_name:
print(f"GPU: {hw.gpu_name}")
if hw.is_apple_silicon:
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
print(f"CPU cores: {hw.cpu_cores}")
print(f"Detection: {hw.detection_method}")
return
selection = select_quant_level(
model_size_gb=args.model_size,
context_length=args.context,
num_layers=args.layers,
num_kv_heads=args.kv_heads,
head_dim=args.head_dim,
preferred_level=args.prefer,
force_cpu=args.force_cpu,
)
if args.json:
result = {
"level": selection.level.name,
"bits_per_channel": selection.level.bits_per_channel,
"compression_ratio": selection.level.compression_ratio,
"quality": selection.level.quality_label,
"reasoning": selection.reasoning,
"total_required_gb": round(selection.total_required_gb, 2),
"available_gb": round(selection.available_gb, 1),
"headroom_gb": round(selection.headroom_gb, 2),
"env_vars": selection.env_vars,
"server_flags": selection.server_flags,
"warnings": selection.warnings,
"hardware": {
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
"gpu_name": selection.hardware.gpu_name,
"is_apple_silicon": selection.hardware.is_apple_silicon,
"chip_name": selection.hardware.chip_name,
"cpu_cores": selection.hardware.cpu_cores,
},
}
print(json.dumps(result, indent=2))
else:
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
print(f" {selection.reasoning}")
print()
print(f"Environment variables:")
for k, v in selection.env_vars.items():
print(f" export {k}={v}")
print()
print(f"Server flags:")
for k, v in selection.server_flags.items():
print(f" {k} {v}")
if selection.warnings:
print()
for w in selection.warnings:
print(f" WARNING: {w}")
if __name__ == "__main__":
main()

View File

@@ -135,5 +135,7 @@ llama-server -m model.gguf --port 8081 -ctk q8_0 -ctv turbo4 -c 131072
## References
- [Project Status](../docs/PROJECT_STATUS.md)
- [TurboQuant Build Spec](../BUILD-SPEC.md)
- [Phase 1 Report](../PHASE1-REPORT.md)
- [Full Knowledge Transfer](../FULL-REPORT.md)
- [llama.cpp TurboQuant Fork](https://github.com/TheTom/llama-cpp-turboquant)

View File

@@ -1,3 +0,0 @@
"""Pytest configuration for turboquant."""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

View File

@@ -1,104 +0,0 @@
#include "llama-turbo.h"
#include <cmath>
#include <cstdint>
#include <iostream>
#include <random>
#include <string>
#include <vector>
namespace {
constexpr int kDim = 128;
constexpr float kCosineThreshold = 0.99f;
constexpr float kZeroTolerance = 1.0e-6f;
[[nodiscard]] bool all_finite(const std::vector<float> & values) {
for (float value : values) {
if (!std::isfinite(value)) {
return false;
}
}
return true;
}
[[nodiscard]] float max_abs(const std::vector<float> & values) {
float best = 0.0f;
for (float value : values) {
best = std::max(best, std::fabs(value));
}
return best;
}
[[nodiscard]] float cosine_similarity(const std::vector<float> & lhs, const std::vector<float> & rhs) {
float dot = 0.0f;
float lhs_norm = 0.0f;
float rhs_norm = 0.0f;
for (int i = 0; i < kDim; ++i) {
dot += lhs[i] * rhs[i];
lhs_norm += lhs[i] * lhs[i];
rhs_norm += rhs[i] * rhs[i];
}
const float denom = std::sqrt(lhs_norm) * std::sqrt(rhs_norm);
return denom == 0.0f ? 1.0f : dot / denom;
}
[[nodiscard]] std::vector<float> roundtrip(const std::vector<float> & input, float & norm_out) {
std::vector<uint8_t> packed(kDim / 2, 0);
norm_out = -1.0f;
polar_quant_encode_turbo4(input.data(), packed.data(), &norm_out, kDim);
std::vector<float> decoded(kDim, 0.0f);
polar_quant_decode_turbo4(packed.data(), decoded.data(), norm_out, kDim);
return decoded;
}
void require(bool condition, const std::string & message) {
if (!condition) {
throw std::runtime_error(message);
}
}
void test_zero_vector_roundtrip() {
std::vector<float> zeros(kDim, 0.0f);
float norm = -1.0f;
const auto decoded = roundtrip(zeros, norm);
require(norm == 0.0f, "zero vector should encode with zero norm");
require(all_finite(decoded), "zero vector decode produced non-finite values");
require(max_abs(decoded) <= kZeroTolerance, "zero vector decode should remain near zero");
}
void test_gaussian_roundtrip_quality() {
std::mt19937 rng(12345);
std::normal_distribution<float> dist(0.0f, 1.0f);
std::vector<float> input(kDim, 0.0f);
for (float & value : input) {
value = dist(rng);
}
float norm = -1.0f;
const auto decoded = roundtrip(input, norm);
require(norm > 0.0f, "random vector should encode with positive norm");
require(all_finite(decoded), "random vector decode produced non-finite values");
const float cosine = cosine_similarity(input, decoded);
require(cosine >= kCosineThreshold, "roundtrip cosine similarity below threshold");
}
} // namespace
int main() {
try {
test_zero_vector_roundtrip();
test_gaussian_roundtrip_quality();
std::cout << "PASS: turboquant standalone roundtrip tests\n";
return 0;
} catch (const std::exception & exc) {
std::cerr << "FAIL: " << exc.what() << '\n';
return 1;
}
}

View File

@@ -1,311 +0,0 @@
#!/usr/bin/env python3
"""Tests for turboquant/auto_select.py — preset selection logic."""
import json
import os
import sys
import tempfile
from pathlib import Path
from unittest.mock import patch, MagicMock
import pytest
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from turboquant.auto_select import (
select_preset,
auto_select,
PRESETS,
QUALITY_ORDER,
SelectionResult,
SystemInfo,
_load_config,
save_config,
format_env_commands,
)
# ── Preset Selection ──────────────────────────────────────────────────────────
class TestSelectPreset:
"""Test preset selection logic."""
def test_high_overhead_best_quality(self):
"""8+ GB overhead -> turboquant_k8v4 (best)."""
result = select_preset(available_gb=20, model_size_gb=10)
assert result.preset == "turboquant_k8v4"
assert result.quality == "best"
assert result.compression_ratio == 2.6
assert result.vllm_compatible is True
def test_medium_overhead_good_quality(self):
"""4-8 GB overhead -> turboquant_4bit_nc (good)."""
result = select_preset(available_gb=12, model_size_gb=6)
assert result.preset == "turboquant_4bit_nc"
assert result.quality == "good"
assert result.compression_ratio == 3.8
def test_low_overhead_usable_quality(self):
"""2-4 GB overhead -> turboquant_3bit_nc (usable)."""
result = select_preset(available_gb=8, model_size_gb=5)
assert result.preset == "turboquant_3bit_nc"
assert result.quality == "usable"
assert result.compression_ratio == 4.9
def test_minimal_overhead_fallback(self):
"""<2 GB overhead -> q4_0 (basic fallback)."""
result = select_preset(available_gb=5, model_size_gb=4)
assert result.preset == "q4_0"
assert result.quality == "basic"
assert result.vllm_compatible is False
def test_negative_overhead_fallback(self):
"""Negative overhead (not enough memory) -> q4_0."""
result = select_preset(available_gb=3, model_size_gb=10)
assert result.preset == "q4_0"
assert result.overhead_gb < 0
assert len(result.warnings) > 0
assert "OVERCOMMITTED" in result.warnings[0]
def test_exact_threshold(self):
"""Exactly at threshold selects that preset."""
result = select_preset(available_gb=12, model_size_gb=4) # 8 GB overhead
assert result.preset == "turboquant_k8v4"
def test_just_below_threshold(self):
"""Just below threshold selects next tier."""
result = select_preset(available_gb=11.9, model_size_gb=4) # 7.9 GB overhead
assert result.preset == "turboquant_4bit_nc"
def test_zero_model_size(self):
"""Zero model size -> all overhead, best preset."""
result = select_preset(available_gb=16, model_size_gb=0)
assert result.preset == "turboquant_k8v4"
# ── vLLM Requirement ─────────────────────────────────────────────────────────
class TestVLLMRequirement:
"""Test vLLM compatibility filtering."""
def test_require_vllm_skips_fallback(self):
"""require_vllm should skip q4_0 (not vLLM compatible)."""
# Very low overhead — without vLLM requirement would pick q4_0
result = select_preset(available_gb=5, model_size_gb=4, require_vllm=True)
# q4_0 is not vLLM compatible; with 1 GB overhead nothing else fits
# so it falls through to q4_0 as final fallback
assert result.preset in QUALITY_ORDER
def test_require_vllm_high_overhead(self):
"""With high overhead, vLLM-compatible preset selected."""
result = select_preset(available_gb=20, model_size_gb=10, require_vllm=True)
assert result.vllm_compatible is True
# ── SelectionResult ───────────────────────────────────────────────────────────
class TestSelectionResult:
"""Test SelectionResult dataclass."""
def test_to_dict(self):
result = select_preset(available_gb=20, model_size_gb=10)
d = result.to_dict()
assert d["preset"] == "turboquant_k8v4"
assert d["quality"] == "best"
assert "env_vars" in d
assert "server_flags" in d
assert "TURBO_LAYER_ADAPTIVE" in d["env_vars"]
assert "-ctk" in d["server_flags"]
def test_env_vars_populated(self):
result = select_preset(available_gb=20, model_size_gb=10)
assert "TURBO_LAYER_ADAPTIVE" in result.env_vars
assert result.env_vars["TURBO_LAYER_ADAPTIVE"] == "7"
def test_server_flags_populated(self):
result = select_preset(available_gb=20, model_size_gb=10)
assert "-ctk" in result.server_flags
assert "-ctv" in result.server_flags
# ── Preset Definitions ────────────────────────────────────────────────────────
class TestPresets:
"""Test preset definitions."""
def test_all_presets_have_required_fields(self):
for name, preset in PRESETS.items():
assert "name" in preset, f"{name} missing 'name'"
assert "description" in preset, f"{name} missing 'description'"
assert "min_overhead_gb" in preset, f"{name} missing 'min_overhead_gb'"
assert "compression_ratio" in preset, f"{name} missing 'compression_ratio'"
assert "quality" in preset, f"{name} missing 'quality'"
assert "vllm_compatible" in preset, f"{name} missing 'vllm_compatible'"
assert "kv_type" in preset, f"{name} missing 'kv_type'"
assert "layer_adaptive" in preset, f"{name} missing 'layer_adaptive'"
def test_quality_order_matches_presets(self):
for name in QUALITY_ORDER:
assert name in PRESETS, f"{name} in QUALITY_ORDER but not in PRESETS"
def test_all_presets_in_quality_order(self):
for name in PRESETS:
assert name in QUALITY_ORDER, f"{name} in PRESETS but not in QUALITY_ORDER"
def test_compression_increases_with_aggressiveness(self):
"""More aggressive presets should have higher compression."""
for i in range(len(QUALITY_ORDER) - 1):
current = PRESETS[QUALITY_ORDER[i]]
next_p = PRESETS[QUALITY_ORDER[i + 1]]
# TurboQuant presets should increase in compression
# q4_0 is a fallback and may break the pattern
if QUALITY_ORDER[i + 1] != "q4_0":
assert current["compression_ratio"] <= next_p["compression_ratio"], (
f"{QUALITY_ORDER[i]} ({current['compression_ratio']}x) should have "
f"<= compression than {QUALITY_ORDER[i+1]} ({next_p['compression_ratio']}x)"
)
def test_overhead_decreases_with_aggressiveness(self):
"""More aggressive presets should have lower overhead requirements."""
for i in range(len(QUALITY_ORDER) - 1):
current = PRESETS[QUALITY_ORDER[i]]
next_p = PRESETS[QUALITY_ORDER[i + 1]]
assert current["min_overhead_gb"] >= next_p["min_overhead_gb"], (
f"{QUALITY_ORDER[i]} (overhead={current['min_overhead_gb']}GB) should have "
f">= overhead than {QUALITY_ORDER[i+1]} (overhead={next_p['min_overhead_gb']}GB)"
)
# ── SystemInfo ────────────────────────────────────────────────────────────────
class TestSystemInfo:
"""Test system detection."""
def test_detect_returns_info(self):
info = SystemInfo.detect()
assert info.total_gb > 0
assert info.available_gb > 0
assert info.detection_method
def test_available_less_than_total(self):
info = SystemInfo.detect()
assert info.available_gb <= info.total_gb
# ── Config ────────────────────────────────────────────────────────────────────
class TestConfig:
"""Test config loading and saving."""
def test_load_config_missing_file(self, tmp_path):
config = _load_config(str(tmp_path / "nonexistent.json"))
assert config == {}
def test_load_config_valid_file(self, tmp_path):
config_file = tmp_path / "turboquant.json"
config_file.write_text(json.dumps({"preset_override": "turboquant_4bit_nc"}))
config = _load_config(str(config_file))
assert config["preset_override"] == "turboquant_4bit_nc"
def test_save_and_load_config(self, tmp_path):
config_file = tmp_path / "turboquant.json"
save_config({"preset_override": "turboquant_k8v4", "context_length": 32768},
str(config_file))
config = _load_config(str(config_file))
assert config["preset_override"] == "turboquant_k8v4"
assert config["context_length"] == 32768
def test_save_config_merges(self, tmp_path):
config_file = tmp_path / "turboquant.json"
config_file.write_text(json.dumps({"existing_key": "value"}))
save_config({"new_key": "new_value"}, str(config_file))
config = _load_config(str(config_file))
assert config["existing_key"] == "value"
assert config["new_key"] == "new_value"
# ── Auto-Select ───────────────────────────────────────────────────────────────
class TestAutoSelect:
"""Test auto_select with mocked system detection."""
def test_auto_select_with_override(self, tmp_path):
config_file = tmp_path / "turboquant.json"
result = auto_select(model_size_gb=14.0, config_override="turboquant_4bit_nc")
assert result.preset == "turboquant_4bit_nc"
assert "Config override" in result.reason
def test_auto_select_unknown_override_ignored(self):
result = auto_select(model_size_gb=14.0, config_override="nonexistent_preset")
# Should fall back to normal detection
assert result.preset in QUALITY_ORDER
def test_auto_select_config_override(self, tmp_path):
"""Config file preset_override is respected."""
config_file = tmp_path / "turboquant.json"
config_file.write_text(json.dumps({"preset_override": "turboquant_3bit_nc"}))
result = auto_select(model_size_gb=14.0, config_path=str(config_file))
assert result.preset == "turboquant_3bit_nc"
def test_auto_select_detects_system(self):
with patch("turboquant.auto_select.SystemInfo.detect") as mock_detect:
mock_detect.return_value = SystemInfo(
total_gb=32, available_gb=24, is_apple_silicon=True,
detection_method="mock"
)
result = auto_select(model_size_gb=14.0)
assert result.system_info is not None
assert result.system_info.total_gb == 32
assert result.preset == "turboquant_k8v4" # 24 - 14 = 10 GB overhead
def test_auto_select_tight_memory(self):
with patch("turboquant.auto_select.SystemInfo.detect") as mock_detect:
mock_detect.return_value = SystemInfo(
total_gb=16, available_gb=12, detection_method="mock"
)
result = auto_select(model_size_gb=10.0)
# 12 - 10 = 2 GB overhead -> turboquant_3bit_nc (min 2 GB)
assert result.preset == "turboquant_3bit_nc"
# ── Format ────────────────────────────────────────────────────────────────────
class TestFormat:
"""Test formatting utilities."""
def test_format_env_commands(self):
result = select_preset(available_gb=20, model_size_gb=10)
output = format_env_commands(result)
assert "export TURBO_LAYER_ADAPTIVE=7" in output
assert "turboquant_k8v4" in output
def test_format_env_commands_with_warnings(self):
result = select_preset(available_gb=5, model_size_gb=4)
output = format_env_commands(result)
assert "WARNING" in output
# ── Integration with issue spec ───────────────────────────────────────────────
class TestIssueSpec:
"""Verify implementation matches the exact logic from issue #97."""
def test_overhead_8gb_plus(self):
"""overhead >= 8 -> turboquant_k8v4"""
result = select_preset(available_gb=15, model_size_gb=7) # 8 GB overhead
assert result.preset == "turboquant_k8v4"
def test_overhead_4_to_8(self):
"""4 <= overhead < 8 -> turboquant_4bit_nc"""
result = select_preset(available_gb=11, model_size_gb=7) # 4 GB overhead
assert result.preset == "turboquant_4bit_nc"
def test_overhead_2_to_4(self):
"""2 <= overhead < 4 -> turboquant_3bit_nc"""
result = select_preset(available_gb=9, model_size_gb=7) # 2 GB overhead
assert result.preset == "turboquant_3bit_nc"
def test_overhead_under_2(self):
"""overhead < 2 -> q4_0"""
result = select_preset(available_gb=8, model_size_gb=7) # 1 GB overhead
assert result.preset == "q4_0"

View File

@@ -1,163 +0,0 @@
#!/usr/bin/env python3
"""Tests for quant_selector.py"""
import sys
import os
import pytest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution.quant_selector import (
QuantLevel,
HardwareInfo,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
class TestQuantLevels:
def test_levels_ordered_by_quality(self):
"""Levels should be ordered from best quality to most aggressive."""
for i in range(len(QUANT_LEVELS) - 1):
assert QUANT_LEVELS[i].bits_per_channel > QUANT_LEVELS[i + 1].bits_per_channel
def test_all_levels_have_required_fields(self):
for level in QUANT_LEVELS:
assert level.name
assert level.bits_per_channel > 0
assert level.compression_ratio > 1
assert level.quality_label
assert level.layer_adaptive >= 0
assert level.kv_type
class TestKVEstimate:
def test_basic_estimate(self):
# 48 layers, 8 heads, 128 dim, 32K context, 3.5 bits
kv_gb = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
assert kv_gb > 0
assert kv_gb < 10 # Should be reasonable
def test_longer_context_larger(self):
kv_32k = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
kv_128k = estimate_kv_cache_gb(131072, 48, 8, 128, 3.5)
assert kv_128k > kv_32k
def test_higher_bits_larger(self):
kv_4b = estimate_kv_cache_gb(32768, 48, 8, 128, 4.0)
kv_2b = estimate_kv_cache_gb(32768, 48, 8, 128, 2.0)
assert kv_4b > kv_2b
class TestHardwareDetection:
def test_detect_returns_info(self):
hw = detect_hardware()
assert hw.total_memory_gb > 0
assert hw.available_memory_gb > 0
assert hw.detection_method
@patch("evolution.quant_selector.platform.system", return_value="Linux")
@patch("builtins.open", create=True)
def test_linux_detection(self, mock_open, mock_system):
mock_open.return_value.__enter__().read.return_value = (
"MemTotal: 32000000 kB\n"
"MemAvailable: 24000000 kB\n"
)
hw = _detect_linux_fallback()
assert hw.total_memory_gb > 20
def _detect_linux_fallback():
"""Helper to test Linux detection with mocked /proc/meminfo."""
from evolution.quant_selector import _detect_linux
return _detect_linux()
class TestSelection:
def test_selects_turbo4_for_large_memory(self):
"""With plenty of memory, should pick turbo4 (best quality)."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
gpu_memory_gb=64,
gpu_name="Test GPU",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert sel.level.name == "turbo4"
assert sel.headroom_gb > 0
def test_selects_smaller_for_tight_memory(self):
"""With tight memory, should pick a smaller quant."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=16,
available_memory_gb=12,
gpu_memory_gb=16,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=131072)
# Should pick a smaller quant for 128K context on 16GB
assert sel.level.bits_per_channel <= 4.0
def test_preferred_level(self):
"""User can force a specific level."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(
model_size_gb=14.0, context_length=32768,
preferred_level="turbo2"
)
assert sel.level.name == "turbo2"
def test_env_vars_populated(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "TURBO_LAYER_ADAPTIVE" in sel.env_vars
assert "-ctk" in sel.server_flags
assert "-ctv" in sel.server_flags
def test_warnings_on_low_headroom(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=18,
available_memory_gb=14,
gpu_memory_gb=18,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=16.0, context_length=65536)
assert len(sel.warnings) > 0
def test_reasoning_contains_key_info(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=32,
available_memory_gb=24,
is_apple_silicon=True,
chip_name="M4 Max",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "turbo4" in sel.reasoning
assert "M4 Max" in sel.reasoning or "32GB" in sel.reasoning

View File

@@ -1,338 +0,0 @@
"""
Integration test: turboquant compressed model passes hermes tool calls (issue #82).
Validates that a TurboQuant-compressed model can:
1. Parse hermes tool schemas correctly
2. Format tool calls in OpenAI-compatible format
3. Pass through the hermes agent conversation loop
Tests are structured as contract tests -- they validate the schema/format
compatibility without requiring a running model server. The live inference
test is skipped by default (requires llama-server with TurboQuant model).
Usage:
pytest tests/test_tool_call_integration.py -v
pytest tests/test_tool_call_integration.py -v -k live # run live test if server available
"""
import json
import os
import pathlib
import re
import unittest
import pytest
ROOT = pathlib.Path(__file__).resolve().parents[1]
PROFILE_PATH = ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml"
BENCHMARKS_DIR = ROOT / "benchmarks"
class TestHermesProfileSchema(unittest.TestCase):
"""Validate the hermes profile YAML has required fields for tool calling."""
@classmethod
def setUpClass(cls):
import yaml
cls.profile = yaml.safe_load(PROFILE_PATH.read_text())
def test_profile_has_providers(self):
assert "providers" in self.profile, "Profile must define providers"
assert "primary" in self.profile["providers"], "Must have primary provider"
def test_primary_provider_has_endpoint(self):
primary = self.profile["providers"]["primary"]
assert "endpoint" in primary, "Primary provider must have endpoint"
assert primary["endpoint"].startswith("http"), "Endpoint must be HTTP(S) URL"
def test_primary_provider_has_api_path(self):
primary = self.profile["providers"]["primary"]
assert "api_path" in primary, "Primary provider must have api_path"
assert "/chat/completions" in primary["api_path"], (
"api_path should be OpenAI-compatible /chat/completions"
)
def test_turboquant_settings_present(self):
primary = self.profile["providers"]["primary"]
assert "turboquant" in primary, "Must have turboquant config section"
tq = primary["turboquant"]
assert tq.get("enabled") is True, "TurboQuant must be enabled"
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4"), (
"kv_type must be turbo2, turbo3, or turbo4"
)
def test_context_window_configured(self):
primary = self.profile["providers"]["primary"]
assert "context" in primary, "Must have context config"
ctx = primary["context"]
assert ctx.get("max_tokens", 0) >= 8192, (
"max_tokens should be >= 8192 for TurboQuant value proposition"
)
class TestToolSchemaCompatibility(unittest.TestCase):
"""Verify hermes tool schemas serialize to valid JSON for OpenAI tool_calls."""
SAMPLE_TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a text file with line numbers.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"offset": {"type": "integer", "default": 1},
"limit": {"type": "integer", "default": 500},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run a Python script.",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code"},
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
},
]
def test_tool_schemas_serialize_to_json(self):
"""Tool schemas must serialize without errors."""
serialized = json.dumps(self.SAMPLE_TOOL_SCHEMAS)
assert len(serialized) > 0
parsed = json.loads(serialized)
assert len(parsed) == len(self.SAMPLE_TOOL_SCHEMAS)
def test_tool_schemas_have_required_openai_fields(self):
"""Each tool schema must have the fields OpenAI expects."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
assert tool["type"] == "function", "Tool type must be 'function'"
fn = tool["function"]
assert "name" in fn, "Function must have name"
assert "description" in fn, "Function must have description"
assert "parameters" in fn, "Function must have parameters"
params = fn["parameters"]
assert params["type"] == "object", "Parameters type must be 'object'"
assert "properties" in params, "Parameters must have properties"
def test_tool_call_response_format(self):
"""Verify tool_call response matches OpenAI format."""
tool_call = {
"id": "call_abc123",
"type": "function",
"function": {
"name": "read_file",
"arguments": json.dumps({"path": "/tmp/test.txt"}),
},
}
args = json.loads(tool_call["function"]["arguments"])
assert args["path"] == "/tmp/test.txt"
assert tool_call["function"]["name"] in [
t["function"]["name"] for t in self.SAMPLE_TOOL_SCHEMAS
]
def test_tool_names_are_valid_identifiers(self):
"""Tool names must be valid Python identifiers for hermes dispatch."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
name = tool["function"]["name"]
assert re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name), (
f"Tool name \'{name}\' is not a valid identifier"
)
class TestTurboquantServerConfig(unittest.TestCase):
"""Validate server startup configuration matches hermes profile."""
def test_server_command_has_turboquant_flags(self):
"""The server command in the profile must include -ctk/-ctv flags."""
profile_text = PROFILE_PATH.read_text()
assert "-ctk" in profile_text, "Profile server command must include -ctk flag"
assert "-ctv" in profile_text, "Profile server command must include -ctv flag"
def test_server_command_has_context_flag(self):
"""Server command must set context size."""
profile_text = PROFILE_PATH.read_text()
assert re.search(r"-c\s+\d+", profile_text), (
"Server command must include -c <context_size> flag"
)
def test_layer_adaptive_env_var(self):
"""Profile must set TURBO_LAYER_ADAPTIVE env var."""
profile_text = PROFILE_PATH.read_text()
assert "TURBO_LAYER_ADAPTIVE" in profile_text, (
"Profile must configure TURBO_LAYER_ADAPTIVE"
)
class TestBenchmarkData(unittest.TestCase):
"""Validate benchmark test prompts include tool-call test cases."""
@classmethod
def setUpClass(cls):
prompts_path = BENCHMARKS_DIR / "test_prompts.json"
cls.prompts = json.loads(prompts_path.read_text())
def test_has_tool_call_test_prompt(self):
"""Benchmark prompts must include a tool-call format test."""
categories = [p.get("category") for p in self.prompts]
assert "tool_call_format" in categories, (
"Benchmark must include a tool_call_format test case"
)
def test_tool_call_prompt_expects_json(self):
"""Tool call test prompt must expect JSON in the response."""
tool_prompt = next(
p for p in self.prompts if p.get("category") == "tool_call_format"
)
pattern = tool_prompt.get("expected_pattern", "")
assert "json" in pattern.lower() or "\\{" in pattern, (
"Tool call prompt must expect JSON-formatted response"
)
@pytest.mark.skipif(
not os.environ.get("TURBOQUANT_SERVER_URL"),
reason="No TurboQuant server available (set TURBOQUANT_SERVER_URL to run)",
)
class TestLiveToolCallIntegration:
"""Live integration test -- requires running llama-server with TurboQuant."""
def test_server_health(self):
"""Server must respond to /v1/models endpoint."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
resp = requests.get(f"{url}/v1/models", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert "data" in data
assert len(data["data"]) > 0
def test_tool_call_completion(self):
"""Model must return a valid tool_call for a read_file prompt."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
}
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Read the file at /tmp/test.txt"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
choice = data["choices"][0]
msg = choice["message"]
if "tool_calls" in msg and msg["tool_calls"]:
tc = msg["tool_calls"][0]
assert tc["type"] == "function"
assert tc["function"]["name"] == "read_file"
args = json.loads(tc["function"]["arguments"])
assert "path" in args
else:
assert len(msg.get("content", "")) > 0
def test_tool_call_with_multiple_tools(self):
"""Model must handle multiple available tools."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run Python code",
"parameters": {
"type": "object",
"properties": {"code": {"type": "string"}},
"required": ["code"],
},
},
},
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Search the web for 'bitcoin price'"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
assert "choices" in data
assert len(data["choices"]) > 0
if __name__ == "__main__":
unittest.main()

View File

@@ -1,23 +0,0 @@
"""TurboQuant — Auto-configuration and preset selection for KV cache compression."""
from turboquant.auto_select import (
auto_select,
select_preset,
list_presets,
get_preset_info,
SystemInfo,
SelectionResult,
PRESETS,
QUALITY_ORDER,
)
__all__ = [
"auto_select",
"select_preset",
"list_presets",
"get_preset_info",
"SystemInfo",
"SelectionResult",
"PRESETS",
"QUALITY_ORDER",
]

View File

@@ -1,557 +0,0 @@
#!/usr/bin/env python3
"""
TurboQuant Auto-Select — Choose optimal KV cache preset based on available memory.
Implements the config helper from issue #97: detects available memory and
selects the best TurboQuant preset for the deployment environment.
Presets map to deployment configurations (not raw quant levels):
turboquant_k8v4 — 8+ GB overhead, best quality (k/v asymmetric: 8-bit K, 4-bit V)
turboquant_4bit_nc — 4-8 GB overhead, good quality (4-bit, no calibration)
turboquant_3bit_nc — 2-4 GB overhead, usable quality (3-bit, no calibration)
q4_0 — <2 GB overhead, GGUF fallback (no vLLM)
Usage:
from turboquant.auto_select import auto_select
result = auto_select(model_size_gb=7.0)
print(f"Selected: {result.preset}")
CLI:
python3 turboquant/auto_select.py --model-size 7.0
python3 turboquant/auto_select.py --model-size 7.0 --json
python3 turboquant/auto_select.py --list
"""
import json
import logging
import os
import platform
import subprocess
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
# ── Preset Definitions ────────────────────────────────────────────────────────
PRESETS: Dict[str, Dict[str, Any]] = {
"turboquant_k8v4": {
"name": "TurboQuant K8V4",
"description": "Best quality — asymmetric 8-bit K, 4-bit V",
"min_overhead_gb": 8,
"compression_ratio": 2.6,
"quality": "best",
"vllm_compatible": True,
"kv_type": "turbo4",
"layer_adaptive": 7,
},
"turboquant_4bit_nc": {
"name": "TurboQuant 4-bit NC",
"description": "Good quality — 4-bit, no calibration",
"min_overhead_gb": 4,
"compression_ratio": 3.8,
"quality": "good",
"vllm_compatible": True,
"kv_type": "turbo4",
"layer_adaptive": 5,
},
"turboquant_3bit_nc": {
"name": "TurboQuant 3-bit NC",
"description": "Usable quality — 3-bit, no calibration",
"min_overhead_gb": 2,
"compression_ratio": 4.9,
"quality": "usable",
"vllm_compatible": True,
"kv_type": "turbo3",
"layer_adaptive": 3,
},
"q4_0": {
"name": "Q4_0 GGUF",
"description": "GGUF fallback — no TurboQuant, no vLLM",
"min_overhead_gb": 0,
"compression_ratio": 4.0,
"quality": "basic",
"vllm_compatible": False,
"kv_type": "q4_0",
"layer_adaptive": 0,
},
}
# Ordered from best quality to most aggressive
QUALITY_ORDER = ["turboquant_k8v4", "turboquant_4bit_nc", "turboquant_3bit_nc", "q4_0"]
# ── Data Classes ──────────────────────────────────────────────────────────────
@dataclass
class SystemInfo:
"""Detected system resources."""
total_gb: float
available_gb: float
gpu_memory_gb: Optional[float] = None
gpu_name: Optional[str] = None
is_apple_silicon: bool = False
detection_method: str = ""
@classmethod
def detect(cls) -> "SystemInfo":
"""Detect available memory. Uses platform-appropriate methods."""
system = platform.system()
if system == "Darwin":
return cls._detect_macos()
elif system == "Linux":
return cls._detect_linux()
else:
return cls._detect_fallback(system)
@classmethod
def _detect_macos(cls) -> "SystemInfo":
"""Detect Apple Silicon unified memory."""
info = cls(total_gb=0, available_gb=0, is_apple_silicon=True, detection_method="sysctl")
try:
r = subprocess.run(["sysctl", "-n", "hw.memsize"], capture_output=True, text=True, timeout=5)
if r.returncode == 0:
info.total_gb = int(r.stdout.strip()) / (1024 ** 3)
r = subprocess.run(["vm_stat"], capture_output=True, text=True, timeout=5)
if r.returncode == 0:
page_size = 4096
free_pages = 0
for line in r.stdout.split("\n"):
if "Pages free:" in line:
try:
free_pages = int(line.split(":")[-1].strip().rstrip("."))
except ValueError:
pass
info.available_gb = (free_pages * page_size) / (1024 ** 3)
if info.available_gb < 1:
info.available_gb = info.total_gb * 0.70
info.gpu_memory_gb = info.total_gb # Unified memory
r = subprocess.run(
["system_profiler", "SPDisplaysDataType"],
capture_output=True, text=True, timeout=10
)
if r.returncode == 0:
for line in r.stdout.split("\n"):
if "Chipset" in line or "GPU" in line:
info.gpu_name = line.split(":")[-1].strip()
break
except Exception as e:
logger.warning("macOS detection failed: %s", e)
info.total_gb = 16.0
info.available_gb = 12.0
info.detection_method = "fallback"
return info
@classmethod
def _detect_linux(cls) -> "SystemInfo":
"""Detect Linux with optional NVIDIA GPU."""
info = cls(total_gb=0, available_gb=0, detection_method="proc")
try:
with open("/proc/meminfo") as f:
for line in f:
if line.startswith("MemTotal:"):
info.total_gb = int(line.split()[1]) / (1024 * 1024)
elif line.startswith("MemAvailable:"):
info.available_gb = int(line.split()[1]) / (1024 * 1024)
try:
r = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
"--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10
)
if r.returncode == 0 and r.stdout.strip():
parts = r.stdout.strip().split("\n")[0].split(", ")
if len(parts) >= 3:
info.gpu_name = parts[0].strip()
info.gpu_memory_gb = float(parts[1]) / 1024
info.available_gb = max(info.available_gb, float(parts[2]) / 1024)
info.detection_method = "nvidia-smi"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass
except Exception as e:
logger.warning("Linux detection failed: %s", e)
info.total_gb = 16.0
info.available_gb = 12.0
info.detection_method = "fallback"
return info
@classmethod
def _detect_fallback(cls, system: str) -> "SystemInfo":
"""Fallback for unknown systems. Tries psutil."""
try:
import psutil
mem = psutil.virtual_memory()
return cls(
total_gb=round(mem.total / (1024 ** 3), 1),
available_gb=round(mem.available / (1024 ** 3), 1),
detection_method="psutil",
)
except ImportError:
return cls(total_gb=16.0, available_gb=12.0, detection_method="fallback")
@dataclass
class SelectionResult:
"""Result of preset selection."""
preset: str
reason: str
overhead_gb: float
quality: str
compression_ratio: float
vllm_compatible: bool
kv_type: str
layer_adaptive: int
env_vars: Dict[str, str] = field(default_factory=dict)
server_flags: Dict[str, str] = field(default_factory=dict)
warnings: list = field(default_factory=list)
system_info: Optional[SystemInfo] = None
def to_dict(self) -> dict:
d = {
"preset": self.preset,
"reason": self.reason,
"overhead_gb": round(self.overhead_gb, 2),
"quality": self.quality,
"compression_ratio": self.compression_ratio,
"vllm_compatible": self.vllm_compatible,
"kv_type": self.kv_type,
"layer_adaptive": self.layer_adaptive,
"env_vars": self.env_vars,
"server_flags": self.server_flags,
"warnings": self.warnings,
}
if self.system_info:
d["system"] = {
"total_gb": round(self.system_info.total_gb, 1),
"available_gb": round(self.system_info.available_gb, 1),
"gpu_name": self.system_info.gpu_name,
"is_apple_silicon": self.system_info.is_apple_silicon,
"detection_method": self.system_info.detection_method,
}
return d
# ── Selection Logic ───────────────────────────────────────────────────────────
def select_preset(
available_gb: float,
model_size_gb: float,
require_vllm: bool = False,
) -> SelectionResult:
"""Select the best TurboQuant preset based on memory overhead.
Args:
available_gb: Available system memory in GB
model_size_gb: Model size in GB
require_vllm: If True, only select vLLM-compatible presets
Returns:
SelectionResult with chosen preset and reasoning
"""
overhead_gb = available_gb - model_size_gb
if overhead_gb < 0:
logger.warning(
"Insufficient memory: need %.1f GB, have %.1f GB available",
model_size_gb, available_gb
)
return _make_result("q4_0", overhead_gb,
reason=f"Insufficient memory ({overhead_gb:.1f} GB deficit), using GGUF fallback")
# Walk quality order (best first), pick first that fits
for preset_name in QUALITY_ORDER:
preset = PRESETS[preset_name]
if require_vllm and not preset["vllm_compatible"]:
continue
if overhead_gb >= preset["min_overhead_gb"]:
reason = (
f"Overhead {overhead_gb:.1f} GB >= {preset['min_overhead_gb']} GB "
f"required for {preset['name']}"
)
return _make_result(preset_name, overhead_gb, reason=reason)
# Nothing fits — aggressive fallback
return _make_result("q4_0", overhead_gb,
reason=f"Overhead {overhead_gb:.1f} GB too low for TurboQuant, using GGUF fallback")
def _make_result(preset_name: str, overhead_gb: float, reason: str) -> SelectionResult:
"""Build a SelectionResult from a preset name."""
p = PRESETS[preset_name]
env_vars = {"TURBO_LAYER_ADAPTIVE": str(p["layer_adaptive"])}
server_flags = {"-ctk": p["kv_type"], "-ctv": p["kv_type"]}
warnings = []
if overhead_gb < 2.0 and overhead_gb >= 0:
warnings.append(f"Low headroom ({overhead_gb:.1f} GB). Consider reducing context length.")
if overhead_gb < 0:
warnings.append(
f"OVERCOMMITTED: model needs {abs(overhead_gb):.1f} GB more than available. "
f"Inference may fail or swap heavily."
)
return SelectionResult(
preset=preset_name,
reason=reason,
overhead_gb=overhead_gb,
quality=p["quality"],
compression_ratio=p["compression_ratio"],
vllm_compatible=p["vllm_compatible"],
kv_type=p["kv_type"],
layer_adaptive=p["layer_adaptive"],
env_vars=env_vars,
server_flags=server_flags,
warnings=warnings,
)
# ── Auto-Select (with system detection + config) ─────────────────────────────
def auto_select(
model_size_gb: float,
config_override: Optional[str] = None,
require_vllm: bool = False,
config_path: Optional[str] = None,
) -> SelectionResult:
"""Auto-select preset based on system detection.
Args:
model_size_gb: Model size in GB
config_override: Force a specific preset (skips detection)
require_vllm: Require vLLM compatibility
config_path: Path to config file (default: $HERMES_HOME/turboquant.json)
Returns:
SelectionResult
"""
# Load config file if exists
config = _load_config(config_path)
# Config override takes precedence
override = config_override or config.get("preset_override")
if override:
if override in PRESETS:
logger.info("Using config override: %s", override)
p = PRESETS[override]
return SelectionResult(
preset=override,
reason=f"Config override: {p['name']}",
overhead_gb=0,
quality=p["quality"],
compression_ratio=p["compression_ratio"],
vllm_compatible=p["vllm_compatible"],
kv_type=p["kv_type"],
layer_adaptive=p["layer_adaptive"],
env_vars={"TURBO_LAYER_ADAPTIVE": str(p["layer_adaptive"])},
server_flags={"-ctk": p["kv_type"], "-ctv": p["kv_type"]},
)
else:
logger.warning("Unknown preset override '%s', falling back to auto-select", override)
# Detect system
sys_info = SystemInfo.detect()
logger.info(
"System: %.1f GB total, %.1f GB available, model: %.1f GB",
sys_info.total_gb, sys_info.available_gb, model_size_gb
)
# Select
result = select_preset(
available_gb=sys_info.available_gb,
model_size_gb=model_size_gb,
require_vllm=require_vllm,
)
result.system_info = sys_info
# Apply context length from config
context_length = config.get("context_length")
if context_length:
result.server_flags["-c"] = str(context_length)
return result
def _load_config(config_path: Optional[str] = None) -> dict:
"""Load turboquant config from JSON file."""
if config_path:
p = Path(config_path)
else:
try:
from hermes_constants import get_hermes_home
p = get_hermes_home() / "turboquant.json"
except (ImportError, Exception):
p = Path.home() / ".hermes" / "turboquant.json"
if p.exists():
try:
return json.loads(p.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
return {}
def save_config(values: dict, config_path: Optional[str] = None) -> None:
"""Save turboquant config to JSON file."""
if config_path:
p = Path(config_path)
else:
try:
from hermes_constants import get_hermes_home
p = get_hermes_home() / "turboquant.json"
except (ImportError, Exception):
p = Path.home() / ".hermes" / "turboquant.json"
p.parent.mkdir(parents=True, exist_ok=True)
existing = {}
if p.exists():
try:
existing = json.loads(p.read_text(encoding="utf-8"))
except (json.JSONDecodeError, OSError):
pass
existing.update(values)
p.write_text(json.dumps(existing, indent=2), encoding="utf-8")
logger.info("Config saved to %s", p)
# ── Utility ───────────────────────────────────────────────────────────────────
def get_preset_info(preset_name: str) -> Optional[dict]:
"""Get information about a preset."""
return PRESETS.get(preset_name)
def list_presets() -> dict:
"""List all available presets."""
return PRESETS.copy()
def format_env_commands(result: SelectionResult) -> str:
"""Format result as shell export commands."""
lines = [f"# TurboQuant preset: {result.preset} ({result.quality}, {result.compression_ratio}x)"]
lines.append(f"# {result.reason}")
for k, v in result.env_vars.items():
lines.append(f"export {k}={v}")
flags = " ".join(f"{k} {v}" for k, v in result.server_flags.items())
if flags:
lines.append(f"# llama-server flags: {flags}")
for w in result.warnings:
lines.append(f"# WARNING: {w}")
return "\n".join(lines)
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
"""CLI entry point."""
import argparse
parser = argparse.ArgumentParser(
description="Auto-select TurboQuant preset based on available memory",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
Examples:
%(prog)s --model-size 14.0 # Auto-detect and select
%(prog)s --model-size 7.0 --json # JSON output
%(prog)s --model-size 7.0 --list # List available presets
%(prog)s --model-size 7.0 --require-vllm
%(prog)s --model-size 7.0 --preset turboquant_4bit_nc # Override
""",
)
parser.add_argument("--model-size", type=float, help="Model size in GB")
parser.add_argument("--preset", help="Force a specific preset (skip detection)")
parser.add_argument("--require-vllm", action="store_true", help="Require vLLM compatibility")
parser.add_argument("--json", action="store_true", help="JSON output")
parser.add_argument("--list", action="store_true", help="List available presets")
parser.add_argument("--shell", action="store_true", help="Output as shell export commands")
parser.add_argument("--detect-only", action="store_true", help="Only detect hardware")
parser.add_argument("--config", help="Path to config file")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(message)s")
if args.list:
print("Available presets:")
print(f" {'Name':22} {'Quality':8} {'Compress':8} {'vLLM':5} {'Overhead':8} Description")
for name in QUALITY_ORDER:
p = PRESETS[name]
vllm = "yes" if p["vllm_compatible"] else "no"
print(
f" {name:22} {p['quality']:8} {p['compression_ratio']:.1f}x "
f"{vllm:5} >= {p['min_overhead_gb']:2.0f} GB {p['description']}"
)
return
if args.detect_only:
info = SystemInfo.detect()
if args.json:
print(json.dumps({
"total_gb": round(info.total_gb, 1),
"available_gb": round(info.available_gb, 1),
"gpu_memory_gb": round(info.gpu_memory_gb, 1) if info.gpu_memory_gb else None,
"gpu_name": info.gpu_name,
"is_apple_silicon": info.is_apple_silicon,
"detection_method": info.detection_method,
}, indent=2))
else:
print(f"Total memory: {info.total_gb:.1f} GB")
print(f"Available: {info.available_gb:.1f} GB")
if info.gpu_memory_gb:
print(f"GPU memory: {info.gpu_memory_gb:.1f} GB")
if info.gpu_name:
print(f"GPU: {info.gpu_name}")
if info.is_apple_silicon:
print(f"Chip: Apple Silicon")
print(f"Detection: {info.detection_method}")
return
if not args.model_size:
parser.error("--model-size is required (unless using --list or --detect-only)")
result = auto_select(
model_size_gb=args.model_size,
config_override=args.preset,
require_vllm=args.require_vllm,
config_path=args.config,
)
if args.json:
print(json.dumps(result.to_dict(), indent=2))
elif args.shell:
print(format_env_commands(result))
else:
print(f"Selected: {result.preset} ({result.quality}, {result.compression_ratio}x)")
print(f" {result.reason}")
if result.system_info:
print(f" System: {result.system_info.total_gb:.0f} GB total, {result.system_info.available_gb:.0f} GB available")
print(f" Overhead: {result.overhead_gb:.1f} GB")
print()
print(f"Environment:")
for k, v in result.env_vars.items():
print(f" export {k}={v}")
print(f"Server flags:")
for k, v in result.server_flags.items():
print(f" {k} {v}")
if result.warnings:
print()
for w in result.warnings:
print(f" WARNING: {w}")
if __name__ == "__main__":
main()