diff --git a/evolution/quant_selector.py b/evolution/quant_selector.py new file mode 100644 index 0000000..73a5442 --- /dev/null +++ b/evolution/quant_selector.py @@ -0,0 +1,548 @@ +"""Auto-select TurboQuant compression level based on available VRAM/RAM. + +Detects hardware resources at startup and picks the highest quality +quantization level that fits within available memory. Supports Apple +Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback. + +Usage: + from evolution.quant_selector import select_quant_level + + selection = select_quant_level(model_size_gb=14.0, context_length=32768) + print(selection.level) # "turbo4" + print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..." + print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"} +""" + +import logging +import os +import platform +import subprocess +import sys +from dataclasses import dataclass, field +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + + +# ── Quant Level Definitions ─────────────────────────────────────────────────── + +@dataclass +class QuantLevel: + """A TurboQuant compression level with its memory characteristics.""" + name: str # e.g. "turbo4" + bits_per_channel: float # e.g. 3.5 for turbo4 + compression_ratio: float # vs uncompressed KV cache + quality_label: str # "best", "high", "balanced", "fast" + layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7) + kv_type: str # -ctk/-ctv flag value + min_memory_headroom_gb: float # Minimum free memory to recommend this level + description: str = "" + + +# Ordered from highest quality to most aggressive compression +QUANT_LEVELS = [ + QuantLevel( + name="turbo4", + bits_per_channel=3.5, + compression_ratio=4.2, + quality_label="best", + layer_adaptive=7, + kv_type="turbo4", + min_memory_headroom_gb=4.0, + description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression." + ), + QuantLevel( + name="turbo3", + bits_per_channel=2.5, + compression_ratio=6.0, + quality_label="high", + layer_adaptive=5, + kv_type="turbo3", + min_memory_headroom_gb=3.0, + description="3-bit TurboQuant. High quality, ~6x KV compression." + ), + QuantLevel( + name="turbo2", + bits_per_channel=1.5, + compression_ratio=10.0, + quality_label="balanced", + layer_adaptive=3, + kv_type="turbo2", + min_memory_headroom_gb=2.0, + description="2-bit TurboQuant. Balanced, ~10x KV compression." + ), + QuantLevel( + name="q4_0", + bits_per_channel=4.0, + compression_ratio=3.5, + quality_label="fast", + layer_adaptive=0, + kv_type="q4_0", + min_memory_headroom_gb=1.5, + description="Standard 4-bit quant. Fast fallback, no TurboQuant." + ), +] + + +# ── Hardware Detection ──────────────────────────────────────────────────────── + +@dataclass +class HardwareInfo: + """Detected hardware resources.""" + total_memory_gb: float + available_memory_gb: float + gpu_memory_gb: Optional[float] = None + gpu_name: Optional[str] = None + is_apple_silicon: bool = False + chip_name: Optional[str] = None + cpu_cores: int = 0 + detection_method: str = "" + + +def detect_hardware() -> HardwareInfo: + """Detect available memory and GPU resources.""" + system = platform.system() + + if system == "Darwin": + return _detect_apple_silicon() + elif system == "Linux": + return _detect_linux() + else: + return _detect_generic(system) + + +def _detect_apple_silicon() -> HardwareInfo: + """Detect Apple Silicon unified memory.""" + info = HardwareInfo( + total_memory_gb=0, + available_memory_gb=0, + is_apple_silicon=True, + detection_method="sysctl", + ) + + try: + # Get total memory + result = subprocess.run( + ["sysctl", "-n", "hw.memsize"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + info.total_memory_gb = int(result.stdout.strip()) / (1024**3) + + # Get chip name + result = subprocess.run( + ["sysctl", "-n", "machdep.cpu.brand_string"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + info.chip_name = result.stdout.strip() + + # Try to get GPU name (Apple Silicon) + result = subprocess.run( + ["system_profiler", "SPDisplaysDataType"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0: + for line in result.stdout.split("\n"): + if "Chipset" in line or "GPU" in line: + info.gpu_name = line.split(":")[-1].strip() + break + + # Estimate available memory (vm_stat) + result = subprocess.run( + ["vm_stat"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + page_size = 4096 # macOS default + free_pages = 0 + for line in result.stdout.split("\n"): + if "Pages free:" in line: + try: + free_pages = int(line.split(":")[-1].strip().rstrip(".")) + except ValueError: + pass + # Available ≈ free + some speculative (conservative: just free) + info.available_memory_gb = (free_pages * page_size) / (1024**3) + + # Fallback if vm_stat parsing failed + if info.available_memory_gb < 1: + # Conservative: 70% of total + info.available_memory_gb = info.total_memory_gb * 0.70 + + # Apple Silicon shares memory — GPU memory = total memory + info.gpu_memory_gb = info.total_memory_gb + + # Detect CPU cores + result = subprocess.run( + ["sysctl", "-n", "hw.ncpu"], + capture_output=True, text=True, timeout=5 + ) + if result.returncode == 0: + info.cpu_cores = int(result.stdout.strip()) + + except Exception as e: + logger.warning(f"Apple Silicon detection failed: {e}") + # Fallback + info.total_memory_gb = 16.0 + info.available_memory_gb = 12.0 + info.detection_method = "fallback" + + return info + + +def _detect_linux() -> HardwareInfo: + """Detect Linux system with optional NVIDIA GPU.""" + info = HardwareInfo( + total_memory_gb=0, + available_memory_gb=0, + detection_method="proc", + ) + + try: + # Read /proc/meminfo + with open("/proc/meminfo", "r") as f: + meminfo = f.read() + + for line in meminfo.split("\n"): + if line.startswith("MemTotal:"): + kb = int(line.split()[1]) + info.total_memory_gb = kb / (1024 * 1024) + elif line.startswith("MemAvailable:"): + kb = int(line.split()[1]) + info.available_memory_gb = kb / (1024 * 1024) + + # CPU cores + info.cpu_cores = os.cpu_count() or 1 + + # Check for NVIDIA GPU + try: + result = subprocess.run( + ["nvidia-smi", "--query-gpu=name,memory.total,memory.free", + "--format=csv,noheader,nounits"], + capture_output=True, text=True, timeout=10 + ) + if result.returncode == 0 and result.stdout.strip(): + lines = result.stdout.strip().split("\n") + if lines: + parts = lines[0].split(", ") + if len(parts) >= 3: + info.gpu_name = parts[0].strip() + info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB + gpu_free = float(parts[2]) / 1024 + # Use GPU free for VRAM-based selection + info.available_memory_gb = max(info.available_memory_gb, gpu_free) + info.detection_method = "nvidia-smi" + except (FileNotFoundError, subprocess.TimeoutExpired): + pass # No NVIDIA GPU + + except Exception as e: + logger.warning(f"Linux detection failed: {e}") + info.total_memory_gb = 16.0 + info.available_memory_gb = 12.0 + info.detection_method = "fallback" + + return info + + +def _detect_generic(system: str) -> HardwareInfo: + """Fallback detection for unknown systems.""" + import psutil + mem = psutil.virtual_memory() + return HardwareInfo( + total_memory_gb=mem.total / (1024**3), + available_memory_gb=mem.available / (1024**3), + cpu_cores=os.cpu_count() or 1, + detection_method="psutil", + ) + + +# ── KV Cache Memory Estimation ─────────────────────────────────────────────── + +def estimate_kv_cache_gb( + context_length: int, + num_layers: int = 48, + num_kv_heads: int = 8, + head_dim: int = 128, + bits_per_channel: float = 3.5, +) -> float: + """Estimate KV cache memory for given parameters. + + Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8 + """ + bytes_per_element = bits_per_channel / 8.0 + total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element + return total_bytes / (1024**3) + + +def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float: + """Estimate model weights memory. Returns loaded size in GB. + + This is a rough estimate — actual depends on exact quant format. + """ + # Common quant ratios (vs fp16) + quant_multipliers = { + "f16": 1.0, + "q8_0": 0.5, + "q6_k": 0.42, + "q5_k_m": 0.37, + "q4_k_m": 0.32, + "q3_k_m": 0.27, + "q2_k": 0.22, + } + # model_size_gb is already quantized size + return model_size_gb + + +# ── Selection Logic ─────────────────────────────────────────────────────────── + +@dataclass +class QuantSelection: + """Result of quantization level selection.""" + level: QuantLevel + hardware: HardwareInfo + reasoning: str + total_required_gb: float + available_gb: float + headroom_gb: float + env_vars: dict = field(default_factory=dict) + server_flags: dict = field(default_factory=dict) + warnings: list = field(default_factory=list) + + +def select_quant_level( + model_size_gb: float = 14.0, + context_length: int = 32768, + num_layers: int = 48, + num_kv_heads: int = 8, + head_dim: int = 128, + preferred_level: Optional[str] = None, + force_cpu: bool = False, +) -> QuantSelection: + """Select the best quantization level for available hardware. + + Args: + model_size_gb: Size of the model weights in GB + context_length: Target context length + num_layers: Number of transformer layers + num_kv_heads: Number of KV attention heads + head_dim: Dimension per attention head + preferred_level: Force a specific level (still checks if it fits) + force_cpu: If True, ignore GPU memory + + Returns: + QuantSelection with the chosen level and reasoning + """ + hw = detect_hardware() + + if force_cpu: + hw.gpu_memory_gb = None + hw.gpu_name = None + + # Use the most restrictive memory constraint + # For Apple Silicon: unified memory, use total + # For NVIDIA: use GPU VRAM + # For CPU-only: use system RAM + if hw.gpu_memory_gb and hw.gpu_name: + memory_pool_gb = hw.gpu_memory_gb + memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM" + elif hw.is_apple_silicon: + memory_pool_gb = hw.total_memory_gb + memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified" + else: + memory_pool_gb = hw.total_memory_gb + memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM" + + model_mem = estimate_model_memory_gb(model_size_gb) + + # Try levels from best to most compressed + chosen = None + for level in QUANT_LEVELS: + if preferred_level and level.name != preferred_level: + continue + + kv_mem = estimate_kv_cache_gb( + context_length, num_layers, num_kv_heads, head_dim, + level.bits_per_channel + ) + total_required = model_mem + kv_mem + headroom = memory_pool_gb - total_required + + if headroom >= level.min_memory_headroom_gb: + chosen = level + break + + if preferred_level and level.name == preferred_level: + # User forced this level but it doesn't fit + chosen = level + break + + if chosen is None: + # Nothing fits — pick the most aggressive compression + chosen = QUANT_LEVELS[-1] + logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.") + + # Calculate final numbers + kv_mem = estimate_kv_cache_gb( + context_length, num_layers, num_kv_heads, head_dim, + chosen.bits_per_channel + ) + total_required = model_mem + kv_mem + headroom = memory_pool_gb - total_required + + # Build reasoning + reasoning_parts = [ + f"{memory_label}:", + f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,", + f"{chosen.compression_ratio:.1f}x compression)", + f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache", + f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB", + f"({headroom:.1f}GB headroom)" + ] + reasoning = " ".join(reasoning_parts) + + # Build environment variables for llama.cpp + env_vars = { + "TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive), + } + + # Build server flags + server_flags = { + "-ctk": chosen.kv_type, + "-ctv": chosen.kv_type, + "-c": str(context_length), + } + + # Warnings + warnings = [] + if headroom < 2.0: + warnings.append( + f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size." + ) + if headroom < 0: + warnings.append( + f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. " + f"Inference may fail or swap heavily." + ) + + selection = QuantSelection( + level=chosen, + hardware=hw, + reasoning=reasoning, + total_required_gb=total_required, + available_gb=memory_pool_gb, + headroom_gb=headroom, + env_vars=env_vars, + server_flags=server_flags, + warnings=warnings, + ) + + logger.info(f"Quant selection: {reasoning}") + for w in warnings: + logger.warning(w) + + return selection + + +# ── CLI ─────────────────────────────────────────────────────────────────────── + +def main(): + """CLI entry point for quant level selection.""" + import argparse + import json + + parser = argparse.ArgumentParser( + description="Auto-select TurboQuant compression level based on available hardware" + ) + parser.add_argument("--model-size", type=float, default=14.0, + help="Model size in GB (default: 14.0)") + parser.add_argument("--context", type=int, default=32768, + help="Target context length (default: 32768)") + parser.add_argument("--layers", type=int, default=48, + help="Number of transformer layers (default: 48)") + parser.add_argument("--kv-heads", type=int, default=8, + help="Number of KV attention heads (default: 8)") + parser.add_argument("--head-dim", type=int, default=128, + help="Dimension per attention head (default: 128)") + parser.add_argument("--prefer", type=str, default=None, + choices=[l.name for l in QUANT_LEVELS], + help="Prefer a specific quant level") + parser.add_argument("--force-cpu", action="store_true", + help="Ignore GPU, use CPU memory only") + parser.add_argument("--json", action="store_true", + help="JSON output for automation") + parser.add_argument("--detect-only", action="store_true", + help="Only detect hardware, don't select") + args = parser.parse_args() + + logging.basicConfig(level=logging.INFO, format="%(message)s") + + if args.detect_only: + hw = detect_hardware() + if args.json: + print(json.dumps(hw.__dict__, default=str, indent=2)) + else: + print(f"Total memory: {hw.total_memory_gb:.1f} GB") + print(f"Available: {hw.available_memory_gb:.1f} GB") + if hw.gpu_memory_gb: + print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB") + if hw.gpu_name: + print(f"GPU: {hw.gpu_name}") + if hw.is_apple_silicon: + print(f"Chip: {hw.chip_name or 'Apple Silicon'}") + print(f"CPU cores: {hw.cpu_cores}") + print(f"Detection: {hw.detection_method}") + return + + selection = select_quant_level( + model_size_gb=args.model_size, + context_length=args.context, + num_layers=args.layers, + num_kv_heads=args.kv_heads, + head_dim=args.head_dim, + preferred_level=args.prefer, + force_cpu=args.force_cpu, + ) + + if args.json: + result = { + "level": selection.level.name, + "bits_per_channel": selection.level.bits_per_channel, + "compression_ratio": selection.level.compression_ratio, + "quality": selection.level.quality_label, + "reasoning": selection.reasoning, + "total_required_gb": round(selection.total_required_gb, 2), + "available_gb": round(selection.available_gb, 1), + "headroom_gb": round(selection.headroom_gb, 2), + "env_vars": selection.env_vars, + "server_flags": selection.server_flags, + "warnings": selection.warnings, + "hardware": { + "total_memory_gb": round(selection.hardware.total_memory_gb, 1), + "gpu_name": selection.hardware.gpu_name, + "is_apple_silicon": selection.hardware.is_apple_silicon, + "chip_name": selection.hardware.chip_name, + "cpu_cores": selection.hardware.cpu_cores, + }, + } + print(json.dumps(result, indent=2)) + else: + print(f"Selected: {selection.level.name} ({selection.level.quality_label})") + print(f" {selection.reasoning}") + print() + print(f"Environment variables:") + for k, v in selection.env_vars.items(): + print(f" export {k}={v}") + print() + print(f"Server flags:") + for k, v in selection.server_flags.items(): + print(f" {k} {v}") + if selection.warnings: + print() + for w in selection.warnings: + print(f" WARNING: {w}") + + +if __name__ == "__main__": + main()