549 lines
19 KiB
Python
549 lines
19 KiB
Python
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
|
||
|
||
Detects hardware resources at startup and picks the highest quality
|
||
quantization level that fits within available memory. Supports Apple
|
||
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
|
||
|
||
Usage:
|
||
from evolution.quant_selector import select_quant_level
|
||
|
||
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||
print(selection.level) # "turbo4"
|
||
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
|
||
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
|
||
"""
|
||
|
||
import logging
|
||
import os
|
||
import platform
|
||
import subprocess
|
||
import sys
|
||
from dataclasses import dataclass, field
|
||
from pathlib import Path
|
||
from typing import Optional
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
|
||
# ── Quant Level Definitions ───────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class QuantLevel:
|
||
"""A TurboQuant compression level with its memory characteristics."""
|
||
name: str # e.g. "turbo4"
|
||
bits_per_channel: float # e.g. 3.5 for turbo4
|
||
compression_ratio: float # vs uncompressed KV cache
|
||
quality_label: str # "best", "high", "balanced", "fast"
|
||
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
|
||
kv_type: str # -ctk/-ctv flag value
|
||
min_memory_headroom_gb: float # Minimum free memory to recommend this level
|
||
description: str = ""
|
||
|
||
|
||
# Ordered from highest quality to most aggressive compression
|
||
QUANT_LEVELS = [
|
||
QuantLevel(
|
||
name="turbo4",
|
||
bits_per_channel=3.5,
|
||
compression_ratio=4.2,
|
||
quality_label="best",
|
||
layer_adaptive=7,
|
||
kv_type="turbo4",
|
||
min_memory_headroom_gb=4.0,
|
||
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
|
||
),
|
||
QuantLevel(
|
||
name="turbo3",
|
||
bits_per_channel=2.5,
|
||
compression_ratio=6.0,
|
||
quality_label="high",
|
||
layer_adaptive=5,
|
||
kv_type="turbo3",
|
||
min_memory_headroom_gb=3.0,
|
||
description="3-bit TurboQuant. High quality, ~6x KV compression."
|
||
),
|
||
QuantLevel(
|
||
name="turbo2",
|
||
bits_per_channel=1.5,
|
||
compression_ratio=10.0,
|
||
quality_label="balanced",
|
||
layer_adaptive=3,
|
||
kv_type="turbo2",
|
||
min_memory_headroom_gb=2.0,
|
||
description="2-bit TurboQuant. Balanced, ~10x KV compression."
|
||
),
|
||
QuantLevel(
|
||
name="q4_0",
|
||
bits_per_channel=4.0,
|
||
compression_ratio=3.5,
|
||
quality_label="fast",
|
||
layer_adaptive=0,
|
||
kv_type="q4_0",
|
||
min_memory_headroom_gb=1.5,
|
||
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
|
||
),
|
||
]
|
||
|
||
|
||
# ── Hardware Detection ────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class HardwareInfo:
|
||
"""Detected hardware resources."""
|
||
total_memory_gb: float
|
||
available_memory_gb: float
|
||
gpu_memory_gb: Optional[float] = None
|
||
gpu_name: Optional[str] = None
|
||
is_apple_silicon: bool = False
|
||
chip_name: Optional[str] = None
|
||
cpu_cores: int = 0
|
||
detection_method: str = ""
|
||
|
||
|
||
def detect_hardware() -> HardwareInfo:
|
||
"""Detect available memory and GPU resources."""
|
||
system = platform.system()
|
||
|
||
if system == "Darwin":
|
||
return _detect_apple_silicon()
|
||
elif system == "Linux":
|
||
return _detect_linux()
|
||
else:
|
||
return _detect_generic(system)
|
||
|
||
|
||
def _detect_apple_silicon() -> HardwareInfo:
|
||
"""Detect Apple Silicon unified memory."""
|
||
info = HardwareInfo(
|
||
total_memory_gb=0,
|
||
available_memory_gb=0,
|
||
is_apple_silicon=True,
|
||
detection_method="sysctl",
|
||
)
|
||
|
||
try:
|
||
# Get total memory
|
||
result = subprocess.run(
|
||
["sysctl", "-n", "hw.memsize"],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
if result.returncode == 0:
|
||
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
|
||
|
||
# Get chip name
|
||
result = subprocess.run(
|
||
["sysctl", "-n", "machdep.cpu.brand_string"],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
if result.returncode == 0:
|
||
info.chip_name = result.stdout.strip()
|
||
|
||
# Try to get GPU name (Apple Silicon)
|
||
result = subprocess.run(
|
||
["system_profiler", "SPDisplaysDataType"],
|
||
capture_output=True, text=True, timeout=10
|
||
)
|
||
if result.returncode == 0:
|
||
for line in result.stdout.split("\n"):
|
||
if "Chipset" in line or "GPU" in line:
|
||
info.gpu_name = line.split(":")[-1].strip()
|
||
break
|
||
|
||
# Estimate available memory (vm_stat)
|
||
result = subprocess.run(
|
||
["vm_stat"],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
if result.returncode == 0:
|
||
page_size = 4096 # macOS default
|
||
free_pages = 0
|
||
for line in result.stdout.split("\n"):
|
||
if "Pages free:" in line:
|
||
try:
|
||
free_pages = int(line.split(":")[-1].strip().rstrip("."))
|
||
except ValueError:
|
||
pass
|
||
# Available ≈ free + some speculative (conservative: just free)
|
||
info.available_memory_gb = (free_pages * page_size) / (1024**3)
|
||
|
||
# Fallback if vm_stat parsing failed
|
||
if info.available_memory_gb < 1:
|
||
# Conservative: 70% of total
|
||
info.available_memory_gb = info.total_memory_gb * 0.70
|
||
|
||
# Apple Silicon shares memory — GPU memory = total memory
|
||
info.gpu_memory_gb = info.total_memory_gb
|
||
|
||
# Detect CPU cores
|
||
result = subprocess.run(
|
||
["sysctl", "-n", "hw.ncpu"],
|
||
capture_output=True, text=True, timeout=5
|
||
)
|
||
if result.returncode == 0:
|
||
info.cpu_cores = int(result.stdout.strip())
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Apple Silicon detection failed: {e}")
|
||
# Fallback
|
||
info.total_memory_gb = 16.0
|
||
info.available_memory_gb = 12.0
|
||
info.detection_method = "fallback"
|
||
|
||
return info
|
||
|
||
|
||
def _detect_linux() -> HardwareInfo:
|
||
"""Detect Linux system with optional NVIDIA GPU."""
|
||
info = HardwareInfo(
|
||
total_memory_gb=0,
|
||
available_memory_gb=0,
|
||
detection_method="proc",
|
||
)
|
||
|
||
try:
|
||
# Read /proc/meminfo
|
||
with open("/proc/meminfo", "r") as f:
|
||
meminfo = f.read()
|
||
|
||
for line in meminfo.split("\n"):
|
||
if line.startswith("MemTotal:"):
|
||
kb = int(line.split()[1])
|
||
info.total_memory_gb = kb / (1024 * 1024)
|
||
elif line.startswith("MemAvailable:"):
|
||
kb = int(line.split()[1])
|
||
info.available_memory_gb = kb / (1024 * 1024)
|
||
|
||
# CPU cores
|
||
info.cpu_cores = os.cpu_count() or 1
|
||
|
||
# Check for NVIDIA GPU
|
||
try:
|
||
result = subprocess.run(
|
||
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
|
||
"--format=csv,noheader,nounits"],
|
||
capture_output=True, text=True, timeout=10
|
||
)
|
||
if result.returncode == 0 and result.stdout.strip():
|
||
lines = result.stdout.strip().split("\n")
|
||
if lines:
|
||
parts = lines[0].split(", ")
|
||
if len(parts) >= 3:
|
||
info.gpu_name = parts[0].strip()
|
||
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
|
||
gpu_free = float(parts[2]) / 1024
|
||
# Use GPU free for VRAM-based selection
|
||
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
|
||
info.detection_method = "nvidia-smi"
|
||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||
pass # No NVIDIA GPU
|
||
|
||
except Exception as e:
|
||
logger.warning(f"Linux detection failed: {e}")
|
||
info.total_memory_gb = 16.0
|
||
info.available_memory_gb = 12.0
|
||
info.detection_method = "fallback"
|
||
|
||
return info
|
||
|
||
|
||
def _detect_generic(system: str) -> HardwareInfo:
|
||
"""Fallback detection for unknown systems."""
|
||
import psutil
|
||
mem = psutil.virtual_memory()
|
||
return HardwareInfo(
|
||
total_memory_gb=mem.total / (1024**3),
|
||
available_memory_gb=mem.available / (1024**3),
|
||
cpu_cores=os.cpu_count() or 1,
|
||
detection_method="psutil",
|
||
)
|
||
|
||
|
||
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
|
||
|
||
def estimate_kv_cache_gb(
|
||
context_length: int,
|
||
num_layers: int = 48,
|
||
num_kv_heads: int = 8,
|
||
head_dim: int = 128,
|
||
bits_per_channel: float = 3.5,
|
||
) -> float:
|
||
"""Estimate KV cache memory for given parameters.
|
||
|
||
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
|
||
"""
|
||
bytes_per_element = bits_per_channel / 8.0
|
||
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
|
||
return total_bytes / (1024**3)
|
||
|
||
|
||
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
|
||
"""Estimate model weights memory. Returns loaded size in GB.
|
||
|
||
This is a rough estimate — actual depends on exact quant format.
|
||
"""
|
||
# Common quant ratios (vs fp16)
|
||
quant_multipliers = {
|
||
"f16": 1.0,
|
||
"q8_0": 0.5,
|
||
"q6_k": 0.42,
|
||
"q5_k_m": 0.37,
|
||
"q4_k_m": 0.32,
|
||
"q3_k_m": 0.27,
|
||
"q2_k": 0.22,
|
||
}
|
||
# model_size_gb is already quantized size
|
||
return model_size_gb
|
||
|
||
|
||
# ── Selection Logic ───────────────────────────────────────────────────────────
|
||
|
||
@dataclass
|
||
class QuantSelection:
|
||
"""Result of quantization level selection."""
|
||
level: QuantLevel
|
||
hardware: HardwareInfo
|
||
reasoning: str
|
||
total_required_gb: float
|
||
available_gb: float
|
||
headroom_gb: float
|
||
env_vars: dict = field(default_factory=dict)
|
||
server_flags: dict = field(default_factory=dict)
|
||
warnings: list = field(default_factory=list)
|
||
|
||
|
||
def select_quant_level(
|
||
model_size_gb: float = 14.0,
|
||
context_length: int = 32768,
|
||
num_layers: int = 48,
|
||
num_kv_heads: int = 8,
|
||
head_dim: int = 128,
|
||
preferred_level: Optional[str] = None,
|
||
force_cpu: bool = False,
|
||
) -> QuantSelection:
|
||
"""Select the best quantization level for available hardware.
|
||
|
||
Args:
|
||
model_size_gb: Size of the model weights in GB
|
||
context_length: Target context length
|
||
num_layers: Number of transformer layers
|
||
num_kv_heads: Number of KV attention heads
|
||
head_dim: Dimension per attention head
|
||
preferred_level: Force a specific level (still checks if it fits)
|
||
force_cpu: If True, ignore GPU memory
|
||
|
||
Returns:
|
||
QuantSelection with the chosen level and reasoning
|
||
"""
|
||
hw = detect_hardware()
|
||
|
||
if force_cpu:
|
||
hw.gpu_memory_gb = None
|
||
hw.gpu_name = None
|
||
|
||
# Use the most restrictive memory constraint
|
||
# For Apple Silicon: unified memory, use total
|
||
# For NVIDIA: use GPU VRAM
|
||
# For CPU-only: use system RAM
|
||
if hw.gpu_memory_gb and hw.gpu_name:
|
||
memory_pool_gb = hw.gpu_memory_gb
|
||
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
|
||
elif hw.is_apple_silicon:
|
||
memory_pool_gb = hw.total_memory_gb
|
||
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
|
||
else:
|
||
memory_pool_gb = hw.total_memory_gb
|
||
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
|
||
|
||
model_mem = estimate_model_memory_gb(model_size_gb)
|
||
|
||
# Try levels from best to most compressed
|
||
chosen = None
|
||
for level in QUANT_LEVELS:
|
||
if preferred_level and level.name != preferred_level:
|
||
continue
|
||
|
||
kv_mem = estimate_kv_cache_gb(
|
||
context_length, num_layers, num_kv_heads, head_dim,
|
||
level.bits_per_channel
|
||
)
|
||
total_required = model_mem + kv_mem
|
||
headroom = memory_pool_gb - total_required
|
||
|
||
if headroom >= level.min_memory_headroom_gb:
|
||
chosen = level
|
||
break
|
||
|
||
if preferred_level and level.name == preferred_level:
|
||
# User forced this level but it doesn't fit
|
||
chosen = level
|
||
break
|
||
|
||
if chosen is None:
|
||
# Nothing fits — pick the most aggressive compression
|
||
chosen = QUANT_LEVELS[-1]
|
||
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
|
||
|
||
# Calculate final numbers
|
||
kv_mem = estimate_kv_cache_gb(
|
||
context_length, num_layers, num_kv_heads, head_dim,
|
||
chosen.bits_per_channel
|
||
)
|
||
total_required = model_mem + kv_mem
|
||
headroom = memory_pool_gb - total_required
|
||
|
||
# Build reasoning
|
||
reasoning_parts = [
|
||
f"{memory_label}:",
|
||
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
|
||
f"{chosen.compression_ratio:.1f}x compression)",
|
||
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
|
||
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
|
||
f"({headroom:.1f}GB headroom)"
|
||
]
|
||
reasoning = " ".join(reasoning_parts)
|
||
|
||
# Build environment variables for llama.cpp
|
||
env_vars = {
|
||
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
|
||
}
|
||
|
||
# Build server flags
|
||
server_flags = {
|
||
"-ctk": chosen.kv_type,
|
||
"-ctv": chosen.kv_type,
|
||
"-c": str(context_length),
|
||
}
|
||
|
||
# Warnings
|
||
warnings = []
|
||
if headroom < 2.0:
|
||
warnings.append(
|
||
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
|
||
)
|
||
if headroom < 0:
|
||
warnings.append(
|
||
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
|
||
f"Inference may fail or swap heavily."
|
||
)
|
||
|
||
selection = QuantSelection(
|
||
level=chosen,
|
||
hardware=hw,
|
||
reasoning=reasoning,
|
||
total_required_gb=total_required,
|
||
available_gb=memory_pool_gb,
|
||
headroom_gb=headroom,
|
||
env_vars=env_vars,
|
||
server_flags=server_flags,
|
||
warnings=warnings,
|
||
)
|
||
|
||
logger.info(f"Quant selection: {reasoning}")
|
||
for w in warnings:
|
||
logger.warning(w)
|
||
|
||
return selection
|
||
|
||
|
||
# ── CLI ───────────────────────────────────────────────────────────────────────
|
||
|
||
def main():
|
||
"""CLI entry point for quant level selection."""
|
||
import argparse
|
||
import json
|
||
|
||
parser = argparse.ArgumentParser(
|
||
description="Auto-select TurboQuant compression level based on available hardware"
|
||
)
|
||
parser.add_argument("--model-size", type=float, default=14.0,
|
||
help="Model size in GB (default: 14.0)")
|
||
parser.add_argument("--context", type=int, default=32768,
|
||
help="Target context length (default: 32768)")
|
||
parser.add_argument("--layers", type=int, default=48,
|
||
help="Number of transformer layers (default: 48)")
|
||
parser.add_argument("--kv-heads", type=int, default=8,
|
||
help="Number of KV attention heads (default: 8)")
|
||
parser.add_argument("--head-dim", type=int, default=128,
|
||
help="Dimension per attention head (default: 128)")
|
||
parser.add_argument("--prefer", type=str, default=None,
|
||
choices=[l.name for l in QUANT_LEVELS],
|
||
help="Prefer a specific quant level")
|
||
parser.add_argument("--force-cpu", action="store_true",
|
||
help="Ignore GPU, use CPU memory only")
|
||
parser.add_argument("--json", action="store_true",
|
||
help="JSON output for automation")
|
||
parser.add_argument("--detect-only", action="store_true",
|
||
help="Only detect hardware, don't select")
|
||
args = parser.parse_args()
|
||
|
||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||
|
||
if args.detect_only:
|
||
hw = detect_hardware()
|
||
if args.json:
|
||
print(json.dumps(hw.__dict__, default=str, indent=2))
|
||
else:
|
||
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
|
||
print(f"Available: {hw.available_memory_gb:.1f} GB")
|
||
if hw.gpu_memory_gb:
|
||
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
|
||
if hw.gpu_name:
|
||
print(f"GPU: {hw.gpu_name}")
|
||
if hw.is_apple_silicon:
|
||
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
|
||
print(f"CPU cores: {hw.cpu_cores}")
|
||
print(f"Detection: {hw.detection_method}")
|
||
return
|
||
|
||
selection = select_quant_level(
|
||
model_size_gb=args.model_size,
|
||
context_length=args.context,
|
||
num_layers=args.layers,
|
||
num_kv_heads=args.kv_heads,
|
||
head_dim=args.head_dim,
|
||
preferred_level=args.prefer,
|
||
force_cpu=args.force_cpu,
|
||
)
|
||
|
||
if args.json:
|
||
result = {
|
||
"level": selection.level.name,
|
||
"bits_per_channel": selection.level.bits_per_channel,
|
||
"compression_ratio": selection.level.compression_ratio,
|
||
"quality": selection.level.quality_label,
|
||
"reasoning": selection.reasoning,
|
||
"total_required_gb": round(selection.total_required_gb, 2),
|
||
"available_gb": round(selection.available_gb, 1),
|
||
"headroom_gb": round(selection.headroom_gb, 2),
|
||
"env_vars": selection.env_vars,
|
||
"server_flags": selection.server_flags,
|
||
"warnings": selection.warnings,
|
||
"hardware": {
|
||
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
|
||
"gpu_name": selection.hardware.gpu_name,
|
||
"is_apple_silicon": selection.hardware.is_apple_silicon,
|
||
"chip_name": selection.hardware.chip_name,
|
||
"cpu_cores": selection.hardware.cpu_cores,
|
||
},
|
||
}
|
||
print(json.dumps(result, indent=2))
|
||
else:
|
||
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
|
||
print(f" {selection.reasoning}")
|
||
print()
|
||
print(f"Environment variables:")
|
||
for k, v in selection.env_vars.items():
|
||
print(f" export {k}={v}")
|
||
print()
|
||
print(f"Server flags:")
|
||
for k, v in selection.server_flags.items():
|
||
print(f" {k} {v}")
|
||
if selection.warnings:
|
||
print()
|
||
for w in selection.warnings:
|
||
print(f" WARNING: {w}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|