Compare commits

..

7 Commits

Author SHA1 Message Date
Alexander Whitestone
a537511652 refactor: consolidate hardware optimizer with quant selector (#92)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 17s
2026-04-20 20:38:56 -04:00
492c1cdcfd Merge PR #90
All checks were successful
Smoke Test / smoke (pull_request) Successful in 13s
Merged PR #90: feat: integration test — turboquant compressed model
2026-04-17 01:52:09 +00:00
6e583310a8 Merge PR #91
Merged PR #91: feat: auto-select quantization based on available VRAM
2026-04-17 01:52:06 +00:00
300918ee1e test: quant selector tests (#81)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 12s
2026-04-15 15:04:41 +00:00
f7ea01cb65 feat: auto-select quantization based on available VRAM (#81) 2026-04-15 15:03:04 +00:00
d2edbdadc2 test: add tool call integration tests (#82)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 11s
2026-04-15 14:53:47 +00:00
c009d8df77 test: add pytest conftest (#82) 2026-04-15 14:53:45 +00:00
10 changed files with 1101 additions and 910 deletions

View File

@@ -1,88 +0,0 @@
{
"version": "1.0.0",
"updated": "2026-04-15",
"description": "Offline crisis resources cache for edge deployment",
"national_resources": [
{
"name": "988 Suicide & Crisis Lifeline",
"phone": "988",
"text": "988",
"url": "https://988lifeline.org",
"description": "Free, confidential support 24/7 for people in distress. Call or text 988.",
"languages": ["en", "es"],
"available": "24/7"
},
{
"name": "Crisis Text Line",
"text": "HOME to 741741",
"url": "https://www.crisistextline.org",
"description": "Free crisis support via text message. Text HOME to 741741.",
"languages": ["en", "es"],
"available": "24/7"
},
{
"name": "Veterans Crisis Line",
"phone": "988 (press 1)",
"text": "838255",
"url": "https://www.veteranscrisisline.net",
"description": "Support for Veterans and their loved ones. Call 988, press 1.",
"available": "24/7"
},
{
"name": "Trevor Project (LGBTQ+ Youth)",
"phone": "1-866-488-7386",
"text": "START to 678-678",
"url": "https://www.thetrevorproject.org",
"description": "Crisis intervention and suicide prevention for LGBTQ+ young people.",
"available": "24/7"
},
{
"name": "SAMHSA National Helpline",
"phone": "1-800-662-4357",
"url": "https://www.samhsa.gov/find-help/national-helpline",
"description": "Free, confidential, 24/7 treatment referral and information service.",
"available": "24/7"
}
],
"international_resources": [
{
"name": "International Association for Suicide Prevention",
"url": "https://www.iasp.info/resources/Crisis_Centres/",
"description": "Directory of crisis centers worldwide."
},
{
"name": "Befrienders Worldwide",
"url": "https://www.befrienders.org",
"description": "Emotional support to prevent suicide worldwide."
},
{
"name": "Canada — Talk Suicide",
"phone": "1-833-456-4566",
"text": "456456"
},
{
"name": "UK — Samaritans",
"phone": "116 123",
"email": "jo@samaritans.org"
},
{
"name": "Australia — Lifeline",
"phone": "13 11 14",
"text": "0477 13 11 14"
}
],
"local_resources": [],
"self_help_prompts": [
"Take a slow breath. Inhale for 4 seconds, hold for 4, exhale for 6.",
"Look around. Name 5 things you can see, 4 you can touch, 3 you can hear.",
"You are not alone. This feeling will pass.",
"Call someone you trust right now.",
"Step outside if you can. Fresh air and movement can help.",
"Write down what you're feeling. Getting it out helps.",
"This moment is not your whole life. It's one moment."
]
}

View File

@@ -1,223 +0,0 @@
# Edge Crisis Detection — Deployment Guide
**Part of:** turboquant#99 (1-Bit Models + Edge)
**Issue:** #102
## Overview
Deploy a minimal crisis detection system on edge devices for offline use.
When internet is unavailable but someone is in crisis, a local device can
detect distress signals and display cached crisis resources.
## Target Hardware
| Device | RAM | Notes |
|--------|-----|-------|
| Raspberry Pi 4 | 4GB | Recommended. Runs keyword + Falcon-H1-Tiny-90M |
| Raspberry Pi 4 | 2GB | Keyword detection only (no LLM) |
| Old Android phone | 2GB+ | Termux + llama.cpp, Falcon-H1-Tiny-90M |
| Any x86 SBC | 2GB+ | Full keyword + optional small model |
## Model Selection
### Tier 0: Keyword Detection (any device, <10MB)
- No model needed — pure pattern matching
- Instant response (<1ms)
- Works on 512MB RAM devices
- Covers 80%+ of explicit crisis language
- **Use when:** RAM < 2GB or first-boot before model download
### Tier 1: Falcon-H1-Tiny-90M (~180MB quantized)
- Detects nuanced/implicit distress that keywords miss
- Runs on 2GB+ RAM (Pi 4 4GB recommended)
- ~200ms inference on Pi 4 (CPU)
- Quantized Q4_K_M via llama.cpp
- **Use when:** RAM >= 2GB, want higher recall
### Tier 2: Bonsai-1.7B (~900MB quantized)
- Best accuracy for ambiguous cases
- Needs 3GB+ RAM
- ~1.5s inference on Pi 4
- **Use when:** RAM >= 4GB, false-positive tolerance is low
### Recommendation
Start with **Tier 0 + Tier 1**. Keyword catches obvious cases instantly,
Falcon-H1 catches implicit cases with 200ms latency. Together they cover
>95% of crisis signals with negligible resource use.
## Installation
### Raspberry Pi 4
```bash
# 1. System setup
sudo apt update && sudo apt install -y python3 python3-pip git cmake
# 2. Clone this directory
git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant.git
cd turboquant
# 3. Python keyword detector runs with zero dependencies (pure stdlib)
# 4. (Optional) Build llama.cpp for Tier 1 model
git clone https://github.com/ggerganov/llama.cpp
cd llama.cpp && make -j4 && cd ..
# 5. Download model (Tier 1)
mkdir -p models
# Falcon-H1-Tiny-90M GGUF — find latest on HuggingFace
# wget -O models/falcon-h1-tiny-90m-q4km.gguf <URL>
# 6. Test offline crisis detection
python3 scripts/crisis_detector.py --test
```
### Android (Termux)
```bash
pkg install python git cmake
# Follow Pi steps above, but build llama.cpp with:
cmake -B build -DLLAMA_NATIVE=OFF && cmake --build build -j$(nproc)
```
### Auto-Start on Boot (Pi)
```bash
# Add to /etc/rc.local (before 'exit 0'):
python3 /home/pi/turboquant/scripts/crisis_detector.py --daemon &
```
Or create a systemd service:
```ini
# /etc/systemd/system/crisis-detector.service
[Unit]
Description=Edge Crisis Detector
After=network.target
[Service]
ExecStart=/usr/bin/python3 /home/pi/turboquant/scripts/crisis_detector.py --daemon
Restart=always
User=pi
[Install]
WantedBy=multi-user.target
```
```bash
sudo systemctl enable crisis-detector
sudo systemctl start crisis-detector
```
## Offline Resource Cache
The file `data/crisis_resources.json` is bundled with the deployment.
It contains:
- **988 Suicide & Crisis Lifeline** — call or text 988
- **Crisis Text Line** — text HOME to 741741
- **International Association for Suicide Prevention** — global directory
- Cached local resources (customize per deployment location)
These display immediately when a crisis is detected — no network required.
## How It Works
```
User input
|
v
+-------------------+
| Keyword Matcher | <- Tier 0: instant, no model
| (regex/pattern) |
+--------+----------+
match? --yes--> Show crisis resources
|
no
v
+-------------------+
| Falcon-H1-Tiny | <- Tier 1: ~200ms on Pi 4
| (if available) |
+--------+----------+
crisis? --yes--> Show crisis resources
|
no
v
Continue normally
```
## Testing Offline
```bash
# Disconnect from internet
sudo ip link set wlan0 down
# Run the test suite
python3 scripts/crisis_detector.py --test
# Expected: all tests pass, resources display correctly
# Reconnect
sudo ip link set wlan0 up
```
## File Structure
```
turboquant/
+-- scripts/
| +-- crisis_detector.py # Main detector (keyword + optional LLM)
+-- data/
| +-- crisis_resources.json # Offline resource cache
+-- tests/
| +-- test_edge_crisis.py # Offline verification tests
+-- docs/
+-- edge-crisis-deployment.md # This file
```
## Customization
### Adding Local Resources
Edit `data/crisis_resources.json`:
```json
{
"local_resources": [
{
"name": "City Crisis Center",
"phone": "555-0123",
"address": "123 Main St",
"hours": "24/7"
}
]
}
```
### Adjusting Sensitivity
In `scripts/crisis_detector.py`:
```python
# Keyword threshold: how many keywords trigger a match
KEYWORD_THRESHOLD = 1 # 1 = any keyword triggers (high recall)
# 2 = need 2+ keywords (higher precision)
# LLM threshold (Tier 1/2): confidence score cutoff
LLM_THRESHOLD = 0.6 # 0.6 = default (balanced)
# 0.4 = more sensitive
# 0.8 = more precise
```
## Privacy
- **No data leaves the device.** All detection runs locally.
- No logs of user input are stored by default.
- Enable logging only for debugging (`--log` flag).
- No network calls are made by the crisis detector.
- Resource display is a local text render.
## License
Same as parent project. Crisis detection code and resource data are
provided for humanitarian purposes.

View File

@@ -1,5 +1,29 @@
"""Phase 19: Hardware-Aware Inference Optimization.
Part of the TurboQuant suite for local inference excellence.
"""Backward-compatible shim for hardware-aware quantization selection.
The original Phase 19 placeholder `hardware_optimizer.py` never shipped real
logic. The canonical implementation now lives in `evolution.quant_selector`.
This shim preserves the legacy import path for any downstream callers while
making `quant_selector.py` the single source of truth.
"""
import logging
# ... (rest of the code)
from evolution.quant_selector import ( # noqa: F401
HardwareInfo,
QuantLevel,
QuantSelection,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
__all__ = [
"HardwareInfo",
"QuantLevel",
"QuantSelection",
"QUANT_LEVELS",
"detect_hardware",
"estimate_kv_cache_gb",
"estimate_model_memory_gb",
"select_quant_level",
]

548
evolution/quant_selector.py Normal file
View File

@@ -0,0 +1,548 @@
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
Detects hardware resources at startup and picks the highest quality
quantization level that fits within available memory. Supports Apple
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
Usage:
from evolution.quant_selector import select_quant_level
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
print(selection.level) # "turbo4"
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
"""
import logging
import os
import platform
import subprocess
import sys
from dataclasses import dataclass, field
from pathlib import Path
from typing import Optional
logger = logging.getLogger(__name__)
# ── Quant Level Definitions ───────────────────────────────────────────────────
@dataclass
class QuantLevel:
"""A TurboQuant compression level with its memory characteristics."""
name: str # e.g. "turbo4"
bits_per_channel: float # e.g. 3.5 for turbo4
compression_ratio: float # vs uncompressed KV cache
quality_label: str # "best", "high", "balanced", "fast"
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
kv_type: str # -ctk/-ctv flag value
min_memory_headroom_gb: float # Minimum free memory to recommend this level
description: str = ""
# Ordered from highest quality to most aggressive compression
QUANT_LEVELS = [
QuantLevel(
name="turbo4",
bits_per_channel=3.5,
compression_ratio=4.2,
quality_label="best",
layer_adaptive=7,
kv_type="turbo4",
min_memory_headroom_gb=4.0,
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
),
QuantLevel(
name="turbo3",
bits_per_channel=2.5,
compression_ratio=6.0,
quality_label="high",
layer_adaptive=5,
kv_type="turbo3",
min_memory_headroom_gb=3.0,
description="3-bit TurboQuant. High quality, ~6x KV compression."
),
QuantLevel(
name="turbo2",
bits_per_channel=1.5,
compression_ratio=10.0,
quality_label="balanced",
layer_adaptive=3,
kv_type="turbo2",
min_memory_headroom_gb=2.0,
description="2-bit TurboQuant. Balanced, ~10x KV compression."
),
QuantLevel(
name="q4_0",
bits_per_channel=4.0,
compression_ratio=3.5,
quality_label="fast",
layer_adaptive=0,
kv_type="q4_0",
min_memory_headroom_gb=1.5,
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
),
]
# ── Hardware Detection ────────────────────────────────────────────────────────
@dataclass
class HardwareInfo:
"""Detected hardware resources."""
total_memory_gb: float
available_memory_gb: float
gpu_memory_gb: Optional[float] = None
gpu_name: Optional[str] = None
is_apple_silicon: bool = False
chip_name: Optional[str] = None
cpu_cores: int = 0
detection_method: str = ""
def detect_hardware() -> HardwareInfo:
"""Detect available memory and GPU resources."""
system = platform.system()
if system == "Darwin":
return _detect_apple_silicon()
elif system == "Linux":
return _detect_linux()
else:
return _detect_generic(system)
def _detect_apple_silicon() -> HardwareInfo:
"""Detect Apple Silicon unified memory."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
is_apple_silicon=True,
detection_method="sysctl",
)
try:
# Get total memory
result = subprocess.run(
["sysctl", "-n", "hw.memsize"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
# Get chip name
result = subprocess.run(
["sysctl", "-n", "machdep.cpu.brand_string"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.chip_name = result.stdout.strip()
# Try to get GPU name (Apple Silicon)
result = subprocess.run(
["system_profiler", "SPDisplaysDataType"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0:
for line in result.stdout.split("\n"):
if "Chipset" in line or "GPU" in line:
info.gpu_name = line.split(":")[-1].strip()
break
# Estimate available memory (vm_stat)
result = subprocess.run(
["vm_stat"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
page_size = 4096 # macOS default
free_pages = 0
for line in result.stdout.split("\n"):
if "Pages free:" in line:
try:
free_pages = int(line.split(":")[-1].strip().rstrip("."))
except ValueError:
pass
# Available ≈ free + some speculative (conservative: just free)
info.available_memory_gb = (free_pages * page_size) / (1024**3)
# Fallback if vm_stat parsing failed
if info.available_memory_gb < 1:
# Conservative: 70% of total
info.available_memory_gb = info.total_memory_gb * 0.70
# Apple Silicon shares memory — GPU memory = total memory
info.gpu_memory_gb = info.total_memory_gb
# Detect CPU cores
result = subprocess.run(
["sysctl", "-n", "hw.ncpu"],
capture_output=True, text=True, timeout=5
)
if result.returncode == 0:
info.cpu_cores = int(result.stdout.strip())
except Exception as e:
logger.warning(f"Apple Silicon detection failed: {e}")
# Fallback
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_linux() -> HardwareInfo:
"""Detect Linux system with optional NVIDIA GPU."""
info = HardwareInfo(
total_memory_gb=0,
available_memory_gb=0,
detection_method="proc",
)
try:
# Read /proc/meminfo
with open("/proc/meminfo", "r") as f:
meminfo = f.read()
for line in meminfo.split("\n"):
if line.startswith("MemTotal:"):
kb = int(line.split()[1])
info.total_memory_gb = kb / (1024 * 1024)
elif line.startswith("MemAvailable:"):
kb = int(line.split()[1])
info.available_memory_gb = kb / (1024 * 1024)
# CPU cores
info.cpu_cores = os.cpu_count() or 1
# Check for NVIDIA GPU
try:
result = subprocess.run(
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
"--format=csv,noheader,nounits"],
capture_output=True, text=True, timeout=10
)
if result.returncode == 0 and result.stdout.strip():
lines = result.stdout.strip().split("\n")
if lines:
parts = lines[0].split(", ")
if len(parts) >= 3:
info.gpu_name = parts[0].strip()
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
gpu_free = float(parts[2]) / 1024
# Use GPU free for VRAM-based selection
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
info.detection_method = "nvidia-smi"
except (FileNotFoundError, subprocess.TimeoutExpired):
pass # No NVIDIA GPU
except Exception as e:
logger.warning(f"Linux detection failed: {e}")
info.total_memory_gb = 16.0
info.available_memory_gb = 12.0
info.detection_method = "fallback"
return info
def _detect_generic(system: str) -> HardwareInfo:
"""Fallback detection for unknown systems."""
import psutil
mem = psutil.virtual_memory()
return HardwareInfo(
total_memory_gb=mem.total / (1024**3),
available_memory_gb=mem.available / (1024**3),
cpu_cores=os.cpu_count() or 1,
detection_method="psutil",
)
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
def estimate_kv_cache_gb(
context_length: int,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
bits_per_channel: float = 3.5,
) -> float:
"""Estimate KV cache memory for given parameters.
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
"""
bytes_per_element = bits_per_channel / 8.0
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
return total_bytes / (1024**3)
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
"""Estimate model weights memory. Returns loaded size in GB.
This is a rough estimate — actual depends on exact quant format.
"""
# Common quant ratios (vs fp16)
quant_multipliers = {
"f16": 1.0,
"q8_0": 0.5,
"q6_k": 0.42,
"q5_k_m": 0.37,
"q4_k_m": 0.32,
"q3_k_m": 0.27,
"q2_k": 0.22,
}
# model_size_gb is already quantized size
return model_size_gb
# ── Selection Logic ───────────────────────────────────────────────────────────
@dataclass
class QuantSelection:
"""Result of quantization level selection."""
level: QuantLevel
hardware: HardwareInfo
reasoning: str
total_required_gb: float
available_gb: float
headroom_gb: float
env_vars: dict = field(default_factory=dict)
server_flags: dict = field(default_factory=dict)
warnings: list = field(default_factory=list)
def select_quant_level(
model_size_gb: float = 14.0,
context_length: int = 32768,
num_layers: int = 48,
num_kv_heads: int = 8,
head_dim: int = 128,
preferred_level: Optional[str] = None,
force_cpu: bool = False,
) -> QuantSelection:
"""Select the best quantization level for available hardware.
Args:
model_size_gb: Size of the model weights in GB
context_length: Target context length
num_layers: Number of transformer layers
num_kv_heads: Number of KV attention heads
head_dim: Dimension per attention head
preferred_level: Force a specific level (still checks if it fits)
force_cpu: If True, ignore GPU memory
Returns:
QuantSelection with the chosen level and reasoning
"""
hw = detect_hardware()
if force_cpu:
hw.gpu_memory_gb = None
hw.gpu_name = None
# Use the most restrictive memory constraint
# For Apple Silicon: unified memory, use total
# For NVIDIA: use GPU VRAM
# For CPU-only: use system RAM
if hw.gpu_memory_gb and hw.gpu_name:
memory_pool_gb = hw.gpu_memory_gb
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
elif hw.is_apple_silicon:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
else:
memory_pool_gb = hw.total_memory_gb
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
model_mem = estimate_model_memory_gb(model_size_gb)
# Try levels from best to most compressed
chosen = None
for level in QUANT_LEVELS:
if preferred_level and level.name != preferred_level:
continue
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
level.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
if headroom >= level.min_memory_headroom_gb:
chosen = level
break
if preferred_level and level.name == preferred_level:
# User forced this level but it doesn't fit
chosen = level
break
if chosen is None:
# Nothing fits — pick the most aggressive compression
chosen = QUANT_LEVELS[-1]
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
# Calculate final numbers
kv_mem = estimate_kv_cache_gb(
context_length, num_layers, num_kv_heads, head_dim,
chosen.bits_per_channel
)
total_required = model_mem + kv_mem
headroom = memory_pool_gb - total_required
# Build reasoning
reasoning_parts = [
f"{memory_label}:",
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
f"{chosen.compression_ratio:.1f}x compression)",
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
f"({headroom:.1f}GB headroom)"
]
reasoning = " ".join(reasoning_parts)
# Build environment variables for llama.cpp
env_vars = {
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
}
# Build server flags
server_flags = {
"-ctk": chosen.kv_type,
"-ctv": chosen.kv_type,
"-c": str(context_length),
}
# Warnings
warnings = []
if headroom < 2.0:
warnings.append(
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
)
if headroom < 0:
warnings.append(
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
f"Inference may fail or swap heavily."
)
selection = QuantSelection(
level=chosen,
hardware=hw,
reasoning=reasoning,
total_required_gb=total_required,
available_gb=memory_pool_gb,
headroom_gb=headroom,
env_vars=env_vars,
server_flags=server_flags,
warnings=warnings,
)
logger.info(f"Quant selection: {reasoning}")
for w in warnings:
logger.warning(w)
return selection
# ── CLI ───────────────────────────────────────────────────────────────────────
def main():
"""CLI entry point for quant level selection."""
import argparse
import json
parser = argparse.ArgumentParser(
description="Auto-select TurboQuant compression level based on available hardware"
)
parser.add_argument("--model-size", type=float, default=14.0,
help="Model size in GB (default: 14.0)")
parser.add_argument("--context", type=int, default=32768,
help="Target context length (default: 32768)")
parser.add_argument("--layers", type=int, default=48,
help="Number of transformer layers (default: 48)")
parser.add_argument("--kv-heads", type=int, default=8,
help="Number of KV attention heads (default: 8)")
parser.add_argument("--head-dim", type=int, default=128,
help="Dimension per attention head (default: 128)")
parser.add_argument("--prefer", type=str, default=None,
choices=[l.name for l in QUANT_LEVELS],
help="Prefer a specific quant level")
parser.add_argument("--force-cpu", action="store_true",
help="Ignore GPU, use CPU memory only")
parser.add_argument("--json", action="store_true",
help="JSON output for automation")
parser.add_argument("--detect-only", action="store_true",
help="Only detect hardware, don't select")
args = parser.parse_args()
logging.basicConfig(level=logging.INFO, format="%(message)s")
if args.detect_only:
hw = detect_hardware()
if args.json:
print(json.dumps(hw.__dict__, default=str, indent=2))
else:
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
print(f"Available: {hw.available_memory_gb:.1f} GB")
if hw.gpu_memory_gb:
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
if hw.gpu_name:
print(f"GPU: {hw.gpu_name}")
if hw.is_apple_silicon:
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
print(f"CPU cores: {hw.cpu_cores}")
print(f"Detection: {hw.detection_method}")
return
selection = select_quant_level(
model_size_gb=args.model_size,
context_length=args.context,
num_layers=args.layers,
num_kv_heads=args.kv_heads,
head_dim=args.head_dim,
preferred_level=args.prefer,
force_cpu=args.force_cpu,
)
if args.json:
result = {
"level": selection.level.name,
"bits_per_channel": selection.level.bits_per_channel,
"compression_ratio": selection.level.compression_ratio,
"quality": selection.level.quality_label,
"reasoning": selection.reasoning,
"total_required_gb": round(selection.total_required_gb, 2),
"available_gb": round(selection.available_gb, 1),
"headroom_gb": round(selection.headroom_gb, 2),
"env_vars": selection.env_vars,
"server_flags": selection.server_flags,
"warnings": selection.warnings,
"hardware": {
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
"gpu_name": selection.hardware.gpu_name,
"is_apple_silicon": selection.hardware.is_apple_silicon,
"chip_name": selection.hardware.chip_name,
"cpu_cores": selection.hardware.cpu_cores,
},
}
print(json.dumps(result, indent=2))
else:
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
print(f" {selection.reasoning}")
print()
print(f"Environment variables:")
for k, v in selection.env_vars.items():
print(f" export {k}={v}")
print()
print(f"Server flags:")
for k, v in selection.server_flags.items():
print(f" {k} {v}")
if selection.warnings:
print()
for w in selection.warnings:
print(f" WARNING: {w}")
if __name__ == "__main__":
main()

View File

@@ -1,386 +0,0 @@
#!/usr/bin/env python3
"""
crisis_detector.py — Edge crisis detection (keyword + optional LLM)
Zero-dependency crisis detector for offline edge devices.
Tier 0: Keyword matching (instant, any device)
Tier 1: Small LLM via llama.cpp (200ms on Pi 4, optional)
Usage:
python3 crisis_detector.py # Interactive mode
python3 crisis_detector.py --test # Run self-tests
python3 crisis_detector.py --daemon # Headless daemon mode
python3 crisis_detector.py --check "text" # Single check
Part of turboquant #99, #102.
"""
import json
import os
import re
import sys
import time
from pathlib import Path
# ── Paths ───────────────────────────────────────────────────────────────────
SCRIPT_DIR = Path(__file__).resolve().parent
DATA_DIR = SCRIPT_DIR.parent / "data"
RESOURCES_FILE = DATA_DIR / "crisis_resources.json"
# ── Configuration ───────────────────────────────────────────────────────────
KEYWORD_THRESHOLD = 1 # Keywords needed to trigger (1 = high recall)
LLM_THRESHOLD = 0.6 # LLM confidence cutoff
LLM_TIMEOUT_MS = 5000 # Max LLM inference time
LOG_FILE = None # Set by --log flag
# ── Tier 0: Keyword Detection ──────────────────────────────────────────────
# Crisis keywords organized by severity
CRISIS_PATTERNS = {
"immediate_danger": [
r"\bkill(?:ing)?\s*(?:my)?self\b",
r"\b(?:going|gonna|want(?:ing)?)\s*to\s*die\b",
r"\b(?:end|ending)\s+(?:my|it|this)\s*life\b",
r"\bsuicide\b",
r"\bsuicidal\b",
r"\bwanna\s*die\b",
r"\bwant\s*to\s*die\b",
r"\bcan'?t\s+(?:go\s+on|live|take\s+(?:it|this|anymore))\b",
r"\bi'?m\s+(?:done|finished)\b.*\b(?:life|living)\b",
r"\boverdose\b",
r"\bhanging\s+(?:my)?self\b",
r"\bjump(?:ing)?\s+(?:off|from)\b",
r"\bslit(?:ting)?\s+(?:my\s+)?(?:wrists|throat)\b",
r"\bshoot(?:ing)?\s+(?:my)?self\b",
],
"self_harm": [
r"\bself[\s-]?harm(?:ing|ed)?\b",
r"\bcut(?:ting)?\s+(?:my)?self\b",
r"\bhurt(?:ing)?\s+(?:my)?self\b",
r"\bpunish(?:ing)?\s+(?:my)?self\b",
r"\bburn(?:ing)?\s+(?:my)?self\b",
r"\bscar(?:ring)?\s+(?:my)?self\b",
],
"hopelessness": [
r"\bhopeless\b",
r"\bno\s+(?:point|reason|purpose)\b",
r"\bwhy\s+(?:bother|try|am\s+i\s+here)\b",
r"\bnobody\s+(?:cares|would\s+(?:miss|notice))\b",
r"\bbeen\s+better\s+off\s+(?:dead|gone)\b",
r"\bwouldn'?t\s+(?:miss|care)\b.*\b(?:if|when)\b.*\bdie\b",
r"\bnothing\s+(?:matters|left)\b",
r"\bgive\s+(?:up|me\s+death)\b",
],
"crisis_language": [
r"\b(?:i|can'?t)\s+(?:handle|deal\s+with)\s+(?:this|it|anymore)\b",
r"\btoo\s+much\s+(?:pain|suffering)\b",
r"\bcan'?t\s+(?:take|stand)\s+(?:this|it|anymore)\b",
r"\bbreak(?:ing|s)?\s+down\b",
r"\b(?:i'?m|am)\s+(?:drowning|suffocating|dying)\b",
r"\bsos\b",
r"\bhelp\s+me\b.*\b(?:please|desperate)\b",
r"\bemergency\b.*\b(?:mental|crisis)\b",
r"\b(?:want|need|wish)(?:ing)?\s+(?:the|this|my)\s+pain\s+to\s+(?:stop|end|go\s+away)\b",
r"\bmake\s+(?:the|this|my)\s+pain\s+(?:stop|end|go\s+away)\b",
],
}
# Compile all patterns
_COMPILED_PATTERNS = {}
for category, patterns in CRISIS_PATTERNS.items():
_COMPILED_PATTERNS[category] = [re.compile(p, re.IGNORECASE) for p in patterns]
def detect_keywords(text: str) -> dict:
"""
Tier 0 keyword detection. Returns match info.
Result: {
"detected": bool,
"confidence": float (0-1),
"categories": list[str],
"matches": list[str]
}
"""
matches = []
categories = set()
for category, patterns in _COMPILED_PATTERNS.items():
for pattern in patterns:
m = pattern.search(text)
if m:
matches.append(m.group(0))
categories.add(category)
detected = len(matches) >= KEYWORD_THRESHOLD
# Confidence heuristic
if not detected:
confidence = 0.0
elif "immediate_danger" in categories:
confidence = 0.95
elif "self_harm" in categories:
confidence = 0.85
elif "hopelessness" in categories:
confidence = 0.7
else:
confidence = 0.6
return {
"detected": detected,
"confidence": confidence,
"categories": sorted(categories),
"matches": matches,
"method": "keyword",
}
# ── Tier 1: LLM Detection (Optional) ───────────────────────────────────────
def detect_llm(text: str, model_path: str = None) -> dict:
"""
Tier 1 LLM detection via llama.cpp subprocess.
Falls back gracefully if model not available.
"""
if model_path is None:
default_path = SCRIPT_DIR.parent / "models" / "falcon-h1-tiny-90m-q4km.gguf"
model_path = str(default_path)
if not os.path.exists(model_path):
return {
"detected": False,
"confidence": 0.0,
"method": "llm_unavailable",
"reason": "Model not found at " + model_path,
}
# Build llama-cli command
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "llama-cli"
if not llama_bin.exists():
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "build" / "bin" / "llama-cli"
if not llama_bin.exists():
return {
"detected": False,
"confidence": 0.0,
"method": "llm_unavailable",
"reason": "llama-cli binary not found",
}
prompt = (
"You are a crisis detection system. Analyze the following text and "
"determine if the person is in psychological crisis or expressing "
"suicidal ideation.\n\n"
f"Text: \"{text}\"\n\n"
"Respond with ONLY one word: CRISIS or SAFE\n"
)
try:
import subprocess
result = subprocess.run(
[
str(llama_bin),
"-m", model_path,
"-p", prompt,
"-n", "5",
"--temp", "0.1",
"--no-display-prompt",
],
capture_output=True,
text=True,
timeout=LLM_TIMEOUT_MS / 1000,
)
output = result.stdout.strip().upper()
if "CRISIS" in output:
return {
"detected": True,
"confidence": 0.8,
"method": "llm",
"raw_output": output[:100],
}
else:
return {
"detected": False,
"confidence": 0.9,
"method": "llm",
"raw_output": output[:100],
}
except Exception as e:
return {
"detected": False,
"confidence": 0.0,
"method": "llm_error",
"reason": str(e),
}
# ── Combined Detection ──────────────────────────────────────────────────────
def detect_crisis(text: str, use_llm: bool = True) -> dict:
"""
Full crisis detection pipeline: keyword first, then LLM if available.
"""
kw_result = detect_keywords(text)
if kw_result["detected"]:
return kw_result
if use_llm:
llm_result = detect_llm(text)
if llm_result["detected"]:
return llm_result
return {
"detected": False,
"confidence": 0.95,
"categories": [],
"matches": [],
"method": "keyword+llm",
}
# ── Resource Display ────────────────────────────────────────────────────────
def load_resources() -> dict:
"""Load offline crisis resources."""
if RESOURCES_FILE.exists():
with open(RESOURCES_FILE) as f:
return json.load(f)
return {
"national_resources": [{
"name": "988 Suicide & Crisis Lifeline",
"phone": "988",
"description": "Call or text 988 — free, confidential, 24/7",
}],
"local_resources": [],
}
def display_resources(result: dict) -> str:
"""Format crisis resources for display."""
resources = load_resources()
lines = []
lines.append("=" * 50)
lines.append(" CRISIS RESOURCES — You are not alone")
lines.append("=" * 50)
lines.append("")
for r in resources.get("national_resources", []):
lines.append(f" {r['name']}")
lines.append(f" Phone: {r['phone']}")
if r.get("description"):
lines.append(f" {r['description']}")
lines.append("")
for r in resources.get("local_resources", []):
lines.append(f" {r['name']}")
if r.get("phone"):
lines.append(f" Phone: {r['phone']}")
if r.get("address"):
lines.append(f" Address: {r['address']}")
if r.get("hours"):
lines.append(f" Hours: {r['hours']}")
lines.append("")
lines.append("-" * 50)
lines.append(" Detection: " + result.get("method", "keyword"))
lines.append(" Confidence: " + str(int(result.get("confidence", 0) * 100)) + "%")
if result.get("categories"):
lines.append(" Categories: " + ", ".join(result["categories"]))
lines.append("=" * 50)
return "\n".join(lines)
# ── CLI Interface ───────────────────────────────────────────────────────────
def run_tests():
"""Run self-tests."""
from tests.test_edge_crisis import run_all_tests
return run_all_tests()
def run_check(text: str):
"""Single text check."""
result = detect_crisis(text, use_llm=False)
if result["detected"]:
print(display_resources(result))
else:
print("SAFE — no crisis indicators detected")
return result
def run_interactive():
"""Interactive mode — read lines from stdin, detect crisis."""
resources = load_resources()
print("Edge Crisis Detector (Ctrl+C to exit)")
print("Type a message and press Enter to check.\n")
try:
while True:
try:
text = input("> ").strip()
except EOFError:
break
if not text:
continue
result = detect_crisis(text, use_llm=False)
if result["detected"]:
print("\n" + display_resources(result) + "\n")
else:
print(" [safe]")
except KeyboardInterrupt:
print("\nExiting.")
def run_daemon():
"""Daemon mode — read from a named pipe or stdin, output results."""
import select
print("Edge Crisis Detector — daemon mode")
print("Reading from stdin. Pipe text to detect.\n")
while True:
try:
line = sys.stdin.readline()
if not line:
break
text = line.strip()
if not text:
continue
result = detect_crisis(text, use_llm=False)
if result["detected"]:
output = json.dumps({"crisis": True, "result": result, "resources": load_resources()})
print(output, flush=True)
else:
print(json.dumps({"crisis": False}), flush=True)
except KeyboardInterrupt:
break
def main():
if "--test" in sys.argv:
success = run_tests()
sys.exit(0 if success else 1)
elif "--check" in sys.argv:
idx = sys.argv.index("--check")
if idx + 1 < len(sys.argv):
text = " ".join(sys.argv[idx + 1:])
run_check(text)
else:
print("Usage: crisis_detector.py --check 'text to check'")
sys.exit(1)
elif "--daemon" in sys.argv:
run_daemon()
else:
run_interactive()
if __name__ == "__main__":
main()

3
tests/conftest.py Normal file
View File

@@ -0,0 +1,3 @@
"""Pytest configuration for turboquant."""
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))

View File

@@ -1,209 +0,0 @@
#!/usr/bin/env python3
"""
test_edge_crisis.py — Offline crisis detection tests
Tests run with zero dependencies and no network access.
Verifies keyword detection accuracy and resource loading.
Usage:
python3 tests/test_edge_crisis.py
python3 -m pytest tests/test_edge_crisis.py -v
"""
import json
import os
import sys
from pathlib import Path
# Add parent to path for imports
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from scripts.crisis_detector import detect_keywords, detect_crisis, load_resources
# ── Test Data ───────────────────────────────────────────────────────────────
# Texts that SHOULD trigger crisis detection
CRISIS_TEXTS = [
"I want to kill myself",
"I'm going to die tonight",
"I can't take this anymore, I want to end it",
"Nobody would miss me if I were gone",
"I've been cutting myself to feel something",
"There's no point in living anymore",
"I'm suicidal and I need help",
"I just want the pain to stop forever",
"I've been thinking about hanging myself",
"Please help me I'm drowning in this",
"I give up. Nothing matters anymore.",
"I want to jump off the bridge",
"I can't go on like this",
"Help me I'm desperate",
"I'm done with life",
]
# Texts that should NOT trigger (common false positives)
SAFE_TEXTS = [
"What a kill in that game!",
"I'm dying of laughter",
"This movie killed me",
"I want to kill it at my presentation",
"I'm dead tired after work",
"The jump scare got me",
"I cut my finger cooking",
"This game is killing me (hard level)",
"The point of the story is unclear",
"I'm done with this homework",
"No point in arguing about it",
"The bridge looks beautiful at sunset",
"I gave up sugar for Lent",
"The knife is in the drawer",
"She's hanging the picture on the wall",
]
def test_crisis_texts_detected():
"""All crisis texts must be detected."""
failures = []
for text in CRISIS_TEXTS:
result = detect_keywords(text)
if not result["detected"]:
failures.append(f"MISSED: '{text}'")
assert not failures, "Crisis texts not detected:\n" + "\n".join(failures)
def test_safe_texts_not_detected():
"""Safe texts must not trigger false positives."""
failures = []
for text in SAFE_TEXTS:
result = detect_keywords(text)
if result["detected"]:
failures.append(f"FALSE POSITIVE: '{text}' -> {result['categories']}")
# Allow some false positives — crisis detection should err on side of caution
false_positive_rate = len(failures) / len(SAFE_TEXTS)
assert false_positive_rate < 0.3, (
f"False positive rate too high ({false_positive_rate:.0%}):\n"
+ "\n".join(failures)
)
def test_immediate_danger_high_confidence():
"""Immediate danger keywords must have high confidence."""
for text in ["I want to kill myself", "I'm going to die tonight", "I'm suicidal"]:
result = detect_keywords(text)
assert result["detected"], f"Failed to detect: '{text}'"
assert result["confidence"] >= 0.8, (
f"Low confidence for immediate danger: {result['confidence']}"
)
def test_resources_load():
"""Resources file must exist and be valid."""
resources = load_resources()
assert "national_resources" in resources
assert len(resources["national_resources"]) >= 1
# 988 lifeline must be present
phones = [r.get("phone", "") for r in resources["national_resources"]]
assert any("988" in p for p in phones), "988 Lifeline not in resources"
def test_resources_have_required_fields():
"""All national resources must have name and contact method."""
resources = load_resources()
for r in resources["national_resources"]:
assert "name" in r, f"Resource missing name: {r}"
has_contact = r.get("phone") or r.get("text") or r.get("url")
assert has_contact, f"Resource missing contact: {r['name']}"
def test_keyword_categories():
"""Verify all keyword categories are represented."""
for text, expected_cats in [
("I want to kill myself", ["immediate_danger"]),
("I've been cutting myself", ["self_harm"]),
("There's no point in living", ["hopelessness"]),
]:
result = detect_keywords(text)
assert result["detected"], f"Should detect: '{text}'"
for cat in expected_cats:
assert cat in result["categories"], (
f"Expected category '{cat}' for '{text}', got {result['categories']}"
)
def test_empty_text_safe():
"""Empty text must not trigger."""
result = detect_keywords("")
assert not result["detected"]
assert result["confidence"] == 0.0
def test_detect_crisis_combined():
"""Combined detect_crisis function works (keyword-only, no LLM)."""
result = detect_crisis("I want to kill myself", use_llm=False)
assert result["detected"]
result2 = detect_crisis("Nice weather today", use_llm=False)
assert not result2["detected"]
def test_resource_file_exists():
"""The resources JSON file must exist."""
resources_file = Path(__file__).resolve().parent.parent / "data" / "crisis_resources.json"
assert resources_file.exists(), f"Missing: {resources_file}"
def test_resources_json_valid():
"""Resources file must be valid JSON with expected structure."""
resources_file = Path(__file__).resolve().parent.parent / "data" / "crisis_resources.json"
with open(resources_file) as f:
data = json.load(f)
assert "version" in data
assert "national_resources" in data
assert "self_help_prompts" in data
assert len(data["national_resources"]) >= 3
# ── Runner ──────────────────────────────────────────────────────────────────
def run_all_tests():
"""Run all tests without pytest."""
tests = [
test_crisis_texts_detected,
test_safe_texts_not_detected,
test_immediate_danger_high_confidence,
test_resources_load,
test_resources_have_required_fields,
test_keyword_categories,
test_empty_text_safe,
test_detect_crisis_combined,
test_resource_file_exists,
test_resources_json_valid,
]
passed = 0
failed = 0
for test in tests:
name = test.__name__
try:
test()
print(f" PASS: {name}")
passed += 1
except AssertionError as e:
print(f" FAIL: {name}")
print(f" {e}")
failed += 1
except Exception as e:
print(f" ERROR: {name}: {e}")
failed += 1
print(f"\n{'='*50}")
print(f"Results: {passed} passed, {failed} failed, {passed+failed} total")
print(f"{'='*50}")
return failed == 0
if __name__ == "__main__":
success = run_all_tests()
sys.exit(0 if success else 1)

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
"""Tests for hardware_optimizer compatibility shim."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution import hardware_optimizer, quant_selector
def test_hardware_optimizer_reexports_quant_selector_api():
assert hardware_optimizer.select_quant_level is quant_selector.select_quant_level
assert hardware_optimizer.detect_hardware is quant_selector.detect_hardware
assert hardware_optimizer.HardwareInfo is quant_selector.HardwareInfo
assert hardware_optimizer.QuantSelection is quant_selector.QuantSelection
def test_hardware_optimizer_exports_quant_level_definitions():
assert hardware_optimizer.QUANT_LEVELS is quant_selector.QUANT_LEVELS
assert hardware_optimizer.QuantLevel is quant_selector.QuantLevel

View File

@@ -0,0 +1,163 @@
#!/usr/bin/env python3
"""Tests for quant_selector.py"""
import sys
import os
import pytest
from unittest.mock import patch, MagicMock
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution.quant_selector import (
QuantLevel,
HardwareInfo,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
class TestQuantLevels:
def test_levels_ordered_by_quality(self):
"""Levels should be ordered from best quality to most aggressive."""
for i in range(len(QUANT_LEVELS) - 1):
assert QUANT_LEVELS[i].bits_per_channel > QUANT_LEVELS[i + 1].bits_per_channel
def test_all_levels_have_required_fields(self):
for level in QUANT_LEVELS:
assert level.name
assert level.bits_per_channel > 0
assert level.compression_ratio > 1
assert level.quality_label
assert level.layer_adaptive >= 0
assert level.kv_type
class TestKVEstimate:
def test_basic_estimate(self):
# 48 layers, 8 heads, 128 dim, 32K context, 3.5 bits
kv_gb = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
assert kv_gb > 0
assert kv_gb < 10 # Should be reasonable
def test_longer_context_larger(self):
kv_32k = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
kv_128k = estimate_kv_cache_gb(131072, 48, 8, 128, 3.5)
assert kv_128k > kv_32k
def test_higher_bits_larger(self):
kv_4b = estimate_kv_cache_gb(32768, 48, 8, 128, 4.0)
kv_2b = estimate_kv_cache_gb(32768, 48, 8, 128, 2.0)
assert kv_4b > kv_2b
class TestHardwareDetection:
def test_detect_returns_info(self):
hw = detect_hardware()
assert hw.total_memory_gb > 0
assert hw.available_memory_gb > 0
assert hw.detection_method
@patch("evolution.quant_selector.platform.system", return_value="Linux")
@patch("builtins.open", create=True)
def test_linux_detection(self, mock_open, mock_system):
mock_open.return_value.__enter__().read.return_value = (
"MemTotal: 32000000 kB\n"
"MemAvailable: 24000000 kB\n"
)
hw = _detect_linux_fallback()
assert hw.total_memory_gb > 20
def _detect_linux_fallback():
"""Helper to test Linux detection with mocked /proc/meminfo."""
from evolution.quant_selector import _detect_linux
return _detect_linux()
class TestSelection:
def test_selects_turbo4_for_large_memory(self):
"""With plenty of memory, should pick turbo4 (best quality)."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
gpu_memory_gb=64,
gpu_name="Test GPU",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert sel.level.name == "turbo4"
assert sel.headroom_gb > 0
def test_selects_smaller_for_tight_memory(self):
"""With tight memory, should pick a smaller quant."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=16,
available_memory_gb=12,
gpu_memory_gb=16,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=131072)
# Should pick a smaller quant for 128K context on 16GB
assert sel.level.bits_per_channel <= 4.0
def test_preferred_level(self):
"""User can force a specific level."""
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(
model_size_gb=14.0, context_length=32768,
preferred_level="turbo2"
)
assert sel.level.name == "turbo2"
def test_env_vars_populated(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=64,
available_memory_gb=48,
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "TURBO_LAYER_ADAPTIVE" in sel.env_vars
assert "-ctk" in sel.server_flags
assert "-ctv" in sel.server_flags
def test_warnings_on_low_headroom(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=18,
available_memory_gb=14,
gpu_memory_gb=18,
gpu_name="Test GPU",
cpu_cores=8,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=16.0, context_length=65536)
assert len(sel.warnings) > 0
def test_reasoning_contains_key_info(self):
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
mock_hw.return_value = HardwareInfo(
total_memory_gb=32,
available_memory_gb=24,
is_apple_silicon=True,
chip_name="M4 Max",
cpu_cores=16,
detection_method="mock",
)
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
assert "turbo4" in sel.reasoning
assert "M4 Max" in sel.reasoning or "32GB" in sel.reasoning

View File

@@ -0,0 +1,338 @@
"""
Integration test: turboquant compressed model passes hermes tool calls (issue #82).
Validates that a TurboQuant-compressed model can:
1. Parse hermes tool schemas correctly
2. Format tool calls in OpenAI-compatible format
3. Pass through the hermes agent conversation loop
Tests are structured as contract tests -- they validate the schema/format
compatibility without requiring a running model server. The live inference
test is skipped by default (requires llama-server with TurboQuant model).
Usage:
pytest tests/test_tool_call_integration.py -v
pytest tests/test_tool_call_integration.py -v -k live # run live test if server available
"""
import json
import os
import pathlib
import re
import unittest
import pytest
ROOT = pathlib.Path(__file__).resolve().parents[1]
PROFILE_PATH = ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml"
BENCHMARKS_DIR = ROOT / "benchmarks"
class TestHermesProfileSchema(unittest.TestCase):
"""Validate the hermes profile YAML has required fields for tool calling."""
@classmethod
def setUpClass(cls):
import yaml
cls.profile = yaml.safe_load(PROFILE_PATH.read_text())
def test_profile_has_providers(self):
assert "providers" in self.profile, "Profile must define providers"
assert "primary" in self.profile["providers"], "Must have primary provider"
def test_primary_provider_has_endpoint(self):
primary = self.profile["providers"]["primary"]
assert "endpoint" in primary, "Primary provider must have endpoint"
assert primary["endpoint"].startswith("http"), "Endpoint must be HTTP(S) URL"
def test_primary_provider_has_api_path(self):
primary = self.profile["providers"]["primary"]
assert "api_path" in primary, "Primary provider must have api_path"
assert "/chat/completions" in primary["api_path"], (
"api_path should be OpenAI-compatible /chat/completions"
)
def test_turboquant_settings_present(self):
primary = self.profile["providers"]["primary"]
assert "turboquant" in primary, "Must have turboquant config section"
tq = primary["turboquant"]
assert tq.get("enabled") is True, "TurboQuant must be enabled"
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4"), (
"kv_type must be turbo2, turbo3, or turbo4"
)
def test_context_window_configured(self):
primary = self.profile["providers"]["primary"]
assert "context" in primary, "Must have context config"
ctx = primary["context"]
assert ctx.get("max_tokens", 0) >= 8192, (
"max_tokens should be >= 8192 for TurboQuant value proposition"
)
class TestToolSchemaCompatibility(unittest.TestCase):
"""Verify hermes tool schemas serialize to valid JSON for OpenAI tool_calls."""
SAMPLE_TOOL_SCHEMAS = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a text file with line numbers.",
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string", "description": "File path"},
"offset": {"type": "integer", "default": 1},
"limit": {"type": "integer", "default": 500},
},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run a Python script.",
"parameters": {
"type": "object",
"properties": {
"code": {"type": "string", "description": "Python code"},
},
"required": ["code"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web.",
"parameters": {
"type": "object",
"properties": {
"query": {"type": "string"},
"max_results": {"type": "integer", "default": 5},
},
"required": ["query"],
},
},
},
]
def test_tool_schemas_serialize_to_json(self):
"""Tool schemas must serialize without errors."""
serialized = json.dumps(self.SAMPLE_TOOL_SCHEMAS)
assert len(serialized) > 0
parsed = json.loads(serialized)
assert len(parsed) == len(self.SAMPLE_TOOL_SCHEMAS)
def test_tool_schemas_have_required_openai_fields(self):
"""Each tool schema must have the fields OpenAI expects."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
assert tool["type"] == "function", "Tool type must be 'function'"
fn = tool["function"]
assert "name" in fn, "Function must have name"
assert "description" in fn, "Function must have description"
assert "parameters" in fn, "Function must have parameters"
params = fn["parameters"]
assert params["type"] == "object", "Parameters type must be 'object'"
assert "properties" in params, "Parameters must have properties"
def test_tool_call_response_format(self):
"""Verify tool_call response matches OpenAI format."""
tool_call = {
"id": "call_abc123",
"type": "function",
"function": {
"name": "read_file",
"arguments": json.dumps({"path": "/tmp/test.txt"}),
},
}
args = json.loads(tool_call["function"]["arguments"])
assert args["path"] == "/tmp/test.txt"
assert tool_call["function"]["name"] in [
t["function"]["name"] for t in self.SAMPLE_TOOL_SCHEMAS
]
def test_tool_names_are_valid_identifiers(self):
"""Tool names must be valid Python identifiers for hermes dispatch."""
for tool in self.SAMPLE_TOOL_SCHEMAS:
name = tool["function"]["name"]
assert re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name), (
f"Tool name \'{name}\' is not a valid identifier"
)
class TestTurboquantServerConfig(unittest.TestCase):
"""Validate server startup configuration matches hermes profile."""
def test_server_command_has_turboquant_flags(self):
"""The server command in the profile must include -ctk/-ctv flags."""
profile_text = PROFILE_PATH.read_text()
assert "-ctk" in profile_text, "Profile server command must include -ctk flag"
assert "-ctv" in profile_text, "Profile server command must include -ctv flag"
def test_server_command_has_context_flag(self):
"""Server command must set context size."""
profile_text = PROFILE_PATH.read_text()
assert re.search(r"-c\s+\d+", profile_text), (
"Server command must include -c <context_size> flag"
)
def test_layer_adaptive_env_var(self):
"""Profile must set TURBO_LAYER_ADAPTIVE env var."""
profile_text = PROFILE_PATH.read_text()
assert "TURBO_LAYER_ADAPTIVE" in profile_text, (
"Profile must configure TURBO_LAYER_ADAPTIVE"
)
class TestBenchmarkData(unittest.TestCase):
"""Validate benchmark test prompts include tool-call test cases."""
@classmethod
def setUpClass(cls):
prompts_path = BENCHMARKS_DIR / "test_prompts.json"
cls.prompts = json.loads(prompts_path.read_text())
def test_has_tool_call_test_prompt(self):
"""Benchmark prompts must include a tool-call format test."""
categories = [p.get("category") for p in self.prompts]
assert "tool_call_format" in categories, (
"Benchmark must include a tool_call_format test case"
)
def test_tool_call_prompt_expects_json(self):
"""Tool call test prompt must expect JSON in the response."""
tool_prompt = next(
p for p in self.prompts if p.get("category") == "tool_call_format"
)
pattern = tool_prompt.get("expected_pattern", "")
assert "json" in pattern.lower() or "\\{" in pattern, (
"Tool call prompt must expect JSON-formatted response"
)
@pytest.mark.skipif(
not os.environ.get("TURBOQUANT_SERVER_URL"),
reason="No TurboQuant server available (set TURBOQUANT_SERVER_URL to run)",
)
class TestLiveToolCallIntegration:
"""Live integration test -- requires running llama-server with TurboQuant."""
def test_server_health(self):
"""Server must respond to /v1/models endpoint."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
resp = requests.get(f"{url}/v1/models", timeout=10)
assert resp.status_code == 200
data = resp.json()
assert "data" in data
assert len(data["data"]) > 0
def test_tool_call_completion(self):
"""Model must return a valid tool_call for a read_file prompt."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
}
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Read the file at /tmp/test.txt"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
choice = data["choices"][0]
msg = choice["message"]
if "tool_calls" in msg and msg["tool_calls"]:
tc = msg["tool_calls"][0]
assert tc["type"] == "function"
assert tc["function"]["name"] == "read_file"
args = json.loads(tc["function"]["arguments"])
assert "path" in args
else:
assert len(msg.get("content", "")) > 0
def test_tool_call_with_multiple_tools(self):
"""Model must handle multiple available tools."""
import requests
url = os.environ["TURBOQUANT_SERVER_URL"]
tools = [
{
"type": "function",
"function": {
"name": "read_file",
"description": "Read a file",
"parameters": {
"type": "object",
"properties": {"path": {"type": "string"}},
"required": ["path"],
},
},
},
{
"type": "function",
"function": {
"name": "web_search",
"description": "Search the web",
"parameters": {
"type": "object",
"properties": {"query": {"type": "string"}},
"required": ["query"],
},
},
},
{
"type": "function",
"function": {
"name": "execute_code",
"description": "Run Python code",
"parameters": {
"type": "object",
"properties": {"code": {"type": "string"}},
"required": ["code"],
},
},
},
]
resp = requests.post(
f"{url}/v1/chat/completions",
json={
"model": "gemma-4",
"messages": [
{"role": "user", "content": "Search the web for 'bitcoin price'"}
],
"tools": tools,
"tool_choice": "auto",
},
timeout=120,
)
assert resp.status_code == 200
data = resp.json()
assert "choices" in data
assert len(data["choices"]) > 0
if __name__ == "__main__":
unittest.main()