Compare commits
19 Commits
burn/102-1
...
step35/96-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
d7cfc1db2c | ||
|
|
2fca513e26 | ||
| 7797b9b4c8 | |||
| 0338cf940a | |||
| f3f796fa64 | |||
| 6ab98d65f5 | |||
| c4293f0d31 | |||
| 88a5c48402 | |||
| 3ff52f02b2 | |||
| 8475539070 | |||
|
|
f0f117cdd3 | ||
|
|
a537511652 | ||
|
|
cd18bd06be | ||
| 492c1cdcfd | |||
| 6e583310a8 | |||
| 300918ee1e | |||
| f7ea01cb65 | |||
| d2edbdadc2 | |||
| c009d8df77 |
@@ -18,7 +18,21 @@ jobs:
|
||||
find . -name '*.py' | grep -v llama-cpp-fork | xargs -r python3 -m py_compile
|
||||
find . -name '*.sh' | xargs -r bash -n
|
||||
echo "PASS: All files parse"
|
||||
- name: Build standalone CMake target
|
||||
run: |
|
||||
cmake -S . -B build -DTURBOQUANT_BUILD_TESTS=ON
|
||||
cmake --build build -j$(nproc)
|
||||
- name: Run tests
|
||||
run: |
|
||||
ctest --test-dir build --output-on-failure
|
||||
- name: Secret scan
|
||||
run: |
|
||||
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea | grep -v llama-cpp-fork; then exit 1; fi
|
||||
echo "PASS: No secrets"
|
||||
- name: Tool call regression suite (issue #96)
|
||||
run: |
|
||||
python3 -m pip install -q pytest pyyaml requests
|
||||
pytest tests/tool_call_regression.py -v --tb=short
|
||||
- name: Markdown link check
|
||||
run: |
|
||||
python3 check_markdown_links.py
|
||||
|
||||
2
benchmarks/tool-call-regression.md
Normal file
2
benchmarks/tool-call-regression.md
Normal file
@@ -0,0 +1,2 @@
|
||||
| Timestamp | Model | Preset | Accuracy | read_file | web_search | terminal | execute_code | delegate_task | Parallel |
|
||||
|-----------|-------|--------|----------|-----------|------------|----------|--------------|---------------|----------|
|
||||
124
check_markdown_links.py
Normal file
124
check_markdown_links.py
Normal file
@@ -0,0 +1,124 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Check local markdown links.
|
||||
|
||||
Scans markdown files for local links and fails on broken targets.
|
||||
Ignores:
|
||||
- external URLs (http/https)
|
||||
- anchors (#section)
|
||||
- mailto: and tel:
|
||||
- links inside fenced code blocks
|
||||
- generated/build directories
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import re
|
||||
import sys
|
||||
from pathlib import Path
|
||||
from typing import Iterable
|
||||
|
||||
CODE_FENCE_RE = re.compile(r"^```")
|
||||
LINK_RE = re.compile(r"(?<!!)\[[^\]]+\]\(([^)]+)\)")
|
||||
DEFAULT_SKIP_DIRS = {
|
||||
".git",
|
||||
".gitea",
|
||||
".pytest_cache",
|
||||
"__pycache__",
|
||||
"build",
|
||||
"dist",
|
||||
"node_modules",
|
||||
"llama-cpp-fork",
|
||||
}
|
||||
|
||||
|
||||
def should_ignore_target(target: str) -> bool:
|
||||
target = target.strip()
|
||||
return (
|
||||
not target
|
||||
or target.startswith("http://")
|
||||
or target.startswith("https://")
|
||||
or target.startswith("mailto:")
|
||||
or target.startswith("tel:")
|
||||
or target.startswith("#")
|
||||
)
|
||||
|
||||
|
||||
def normalize_target(target: str) -> str:
|
||||
target = target.strip()
|
||||
if target.startswith("<") and target.endswith(">"):
|
||||
target = target[1:-1].strip()
|
||||
if "#" in target:
|
||||
target = target.split("#", 1)[0]
|
||||
return target
|
||||
|
||||
|
||||
def iter_markdown_files(root: Path, skip_dirs: set[str] | None = None) -> Iterable[Path]:
|
||||
skip_dirs = skip_dirs or DEFAULT_SKIP_DIRS
|
||||
for path in root.rglob("*.md"):
|
||||
if any(part in skip_dirs for part in path.relative_to(root).parts):
|
||||
continue
|
||||
yield path
|
||||
|
||||
|
||||
def iter_links(path: Path) -> Iterable[tuple[int, str]]:
|
||||
in_code_fence = False
|
||||
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
|
||||
if CODE_FENCE_RE.match(line.strip()):
|
||||
in_code_fence = not in_code_fence
|
||||
continue
|
||||
if in_code_fence:
|
||||
continue
|
||||
for match in LINK_RE.finditer(line):
|
||||
yield line_no, match.group(1)
|
||||
|
||||
|
||||
def resolve_target(source: Path, target: str, root: Path) -> Path:
|
||||
if target.startswith("/"):
|
||||
return (root / target.lstrip("/")).resolve()
|
||||
return (source.parent / target).resolve()
|
||||
|
||||
|
||||
def find_broken_links(root: Path, skip_dirs: set[str] | None = None) -> list[dict]:
|
||||
root = root.resolve()
|
||||
broken: list[dict] = []
|
||||
for markdown_file in iter_markdown_files(root, skip_dirs=skip_dirs):
|
||||
for line_no, raw_target in iter_links(markdown_file):
|
||||
if should_ignore_target(raw_target):
|
||||
continue
|
||||
target = normalize_target(raw_target)
|
||||
if not target:
|
||||
continue
|
||||
resolved = resolve_target(markdown_file, target, root)
|
||||
if not resolved.exists():
|
||||
broken.append(
|
||||
{
|
||||
"source": str(markdown_file),
|
||||
"line": line_no,
|
||||
"target": target,
|
||||
"resolved": str(resolved),
|
||||
}
|
||||
)
|
||||
return broken
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser(description="Fail on broken local markdown links.")
|
||||
parser.add_argument("root", nargs="?", default=".", help="Repo root to scan (default: .)")
|
||||
args = parser.parse_args()
|
||||
|
||||
root = Path(args.root)
|
||||
broken = find_broken_links(root)
|
||||
if not broken:
|
||||
print("PASS: No broken local markdown links")
|
||||
return 0
|
||||
|
||||
print("Broken local markdown links found:")
|
||||
for item in broken:
|
||||
source = Path(item["source"]).relative_to(root.resolve())
|
||||
print(f"{source}:{item['line']}: missing target -> {item['target']}")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
@@ -1,88 +0,0 @@
|
||||
{
|
||||
"version": "1.0.0",
|
||||
"updated": "2026-04-15",
|
||||
"description": "Offline crisis resources cache for edge deployment",
|
||||
|
||||
"national_resources": [
|
||||
{
|
||||
"name": "988 Suicide & Crisis Lifeline",
|
||||
"phone": "988",
|
||||
"text": "988",
|
||||
"url": "https://988lifeline.org",
|
||||
"description": "Free, confidential support 24/7 for people in distress. Call or text 988.",
|
||||
"languages": ["en", "es"],
|
||||
"available": "24/7"
|
||||
},
|
||||
{
|
||||
"name": "Crisis Text Line",
|
||||
"text": "HOME to 741741",
|
||||
"url": "https://www.crisistextline.org",
|
||||
"description": "Free crisis support via text message. Text HOME to 741741.",
|
||||
"languages": ["en", "es"],
|
||||
"available": "24/7"
|
||||
},
|
||||
{
|
||||
"name": "Veterans Crisis Line",
|
||||
"phone": "988 (press 1)",
|
||||
"text": "838255",
|
||||
"url": "https://www.veteranscrisisline.net",
|
||||
"description": "Support for Veterans and their loved ones. Call 988, press 1.",
|
||||
"available": "24/7"
|
||||
},
|
||||
{
|
||||
"name": "Trevor Project (LGBTQ+ Youth)",
|
||||
"phone": "1-866-488-7386",
|
||||
"text": "START to 678-678",
|
||||
"url": "https://www.thetrevorproject.org",
|
||||
"description": "Crisis intervention and suicide prevention for LGBTQ+ young people.",
|
||||
"available": "24/7"
|
||||
},
|
||||
{
|
||||
"name": "SAMHSA National Helpline",
|
||||
"phone": "1-800-662-4357",
|
||||
"url": "https://www.samhsa.gov/find-help/national-helpline",
|
||||
"description": "Free, confidential, 24/7 treatment referral and information service.",
|
||||
"available": "24/7"
|
||||
}
|
||||
],
|
||||
|
||||
"international_resources": [
|
||||
{
|
||||
"name": "International Association for Suicide Prevention",
|
||||
"url": "https://www.iasp.info/resources/Crisis_Centres/",
|
||||
"description": "Directory of crisis centers worldwide."
|
||||
},
|
||||
{
|
||||
"name": "Befrienders Worldwide",
|
||||
"url": "https://www.befrienders.org",
|
||||
"description": "Emotional support to prevent suicide worldwide."
|
||||
},
|
||||
{
|
||||
"name": "Canada — Talk Suicide",
|
||||
"phone": "1-833-456-4566",
|
||||
"text": "456456"
|
||||
},
|
||||
{
|
||||
"name": "UK — Samaritans",
|
||||
"phone": "116 123",
|
||||
"email": "jo@samaritans.org"
|
||||
},
|
||||
{
|
||||
"name": "Australia — Lifeline",
|
||||
"phone": "13 11 14",
|
||||
"text": "0477 13 11 14"
|
||||
}
|
||||
],
|
||||
|
||||
"local_resources": [],
|
||||
|
||||
"self_help_prompts": [
|
||||
"Take a slow breath. Inhale for 4 seconds, hold for 4, exhale for 6.",
|
||||
"Look around. Name 5 things you can see, 4 you can touch, 3 you can hear.",
|
||||
"You are not alone. This feeling will pass.",
|
||||
"Call someone you trust right now.",
|
||||
"Step outside if you can. Fresh air and movement can help.",
|
||||
"Write down what you're feeling. Getting it out helps.",
|
||||
"This moment is not your whole life. It's one moment."
|
||||
]
|
||||
}
|
||||
@@ -385,7 +385,7 @@ Step 7: If pass → production. If fail → drop to turbo3 or adjust per-layer p
|
||||
|
||||
---
|
||||
|
||||
*Repo: http://143.198.27.163:3000/Timmy_Foundation/turboquant*
|
||||
*Repo: https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant*
|
||||
*Build: /tmp/llama-cpp-turboquant/build/bin/ (all binaries)*
|
||||
*Branch: feature/turboquant-kv-cache*
|
||||
|
||||
|
||||
@@ -1,223 +0,0 @@
|
||||
# Edge Crisis Detection — Deployment Guide
|
||||
|
||||
**Part of:** turboquant#99 (1-Bit Models + Edge)
|
||||
**Issue:** #102
|
||||
|
||||
## Overview
|
||||
|
||||
Deploy a minimal crisis detection system on edge devices for offline use.
|
||||
When internet is unavailable but someone is in crisis, a local device can
|
||||
detect distress signals and display cached crisis resources.
|
||||
|
||||
## Target Hardware
|
||||
|
||||
| Device | RAM | Notes |
|
||||
|--------|-----|-------|
|
||||
| Raspberry Pi 4 | 4GB | Recommended. Runs keyword + Falcon-H1-Tiny-90M |
|
||||
| Raspberry Pi 4 | 2GB | Keyword detection only (no LLM) |
|
||||
| Old Android phone | 2GB+ | Termux + llama.cpp, Falcon-H1-Tiny-90M |
|
||||
| Any x86 SBC | 2GB+ | Full keyword + optional small model |
|
||||
|
||||
## Model Selection
|
||||
|
||||
### Tier 0: Keyword Detection (any device, <10MB)
|
||||
- No model needed — pure pattern matching
|
||||
- Instant response (<1ms)
|
||||
- Works on 512MB RAM devices
|
||||
- Covers 80%+ of explicit crisis language
|
||||
- **Use when:** RAM < 2GB or first-boot before model download
|
||||
|
||||
### Tier 1: Falcon-H1-Tiny-90M (~180MB quantized)
|
||||
- Detects nuanced/implicit distress that keywords miss
|
||||
- Runs on 2GB+ RAM (Pi 4 4GB recommended)
|
||||
- ~200ms inference on Pi 4 (CPU)
|
||||
- Quantized Q4_K_M via llama.cpp
|
||||
- **Use when:** RAM >= 2GB, want higher recall
|
||||
|
||||
### Tier 2: Bonsai-1.7B (~900MB quantized)
|
||||
- Best accuracy for ambiguous cases
|
||||
- Needs 3GB+ RAM
|
||||
- ~1.5s inference on Pi 4
|
||||
- **Use when:** RAM >= 4GB, false-positive tolerance is low
|
||||
|
||||
### Recommendation
|
||||
Start with **Tier 0 + Tier 1**. Keyword catches obvious cases instantly,
|
||||
Falcon-H1 catches implicit cases with 200ms latency. Together they cover
|
||||
>95% of crisis signals with negligible resource use.
|
||||
|
||||
## Installation
|
||||
|
||||
### Raspberry Pi 4
|
||||
|
||||
```bash
|
||||
# 1. System setup
|
||||
sudo apt update && sudo apt install -y python3 python3-pip git cmake
|
||||
|
||||
# 2. Clone this directory
|
||||
git clone https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant.git
|
||||
cd turboquant
|
||||
|
||||
# 3. Python keyword detector runs with zero dependencies (pure stdlib)
|
||||
|
||||
# 4. (Optional) Build llama.cpp for Tier 1 model
|
||||
git clone https://github.com/ggerganov/llama.cpp
|
||||
cd llama.cpp && make -j4 && cd ..
|
||||
|
||||
# 5. Download model (Tier 1)
|
||||
mkdir -p models
|
||||
# Falcon-H1-Tiny-90M GGUF — find latest on HuggingFace
|
||||
# wget -O models/falcon-h1-tiny-90m-q4km.gguf <URL>
|
||||
|
||||
# 6. Test offline crisis detection
|
||||
python3 scripts/crisis_detector.py --test
|
||||
```
|
||||
|
||||
### Android (Termux)
|
||||
|
||||
```bash
|
||||
pkg install python git cmake
|
||||
# Follow Pi steps above, but build llama.cpp with:
|
||||
cmake -B build -DLLAMA_NATIVE=OFF && cmake --build build -j$(nproc)
|
||||
```
|
||||
|
||||
### Auto-Start on Boot (Pi)
|
||||
|
||||
```bash
|
||||
# Add to /etc/rc.local (before 'exit 0'):
|
||||
python3 /home/pi/turboquant/scripts/crisis_detector.py --daemon &
|
||||
```
|
||||
|
||||
Or create a systemd service:
|
||||
|
||||
```ini
|
||||
# /etc/systemd/system/crisis-detector.service
|
||||
[Unit]
|
||||
Description=Edge Crisis Detector
|
||||
After=network.target
|
||||
|
||||
[Service]
|
||||
ExecStart=/usr/bin/python3 /home/pi/turboquant/scripts/crisis_detector.py --daemon
|
||||
Restart=always
|
||||
User=pi
|
||||
|
||||
[Install]
|
||||
WantedBy=multi-user.target
|
||||
```
|
||||
|
||||
```bash
|
||||
sudo systemctl enable crisis-detector
|
||||
sudo systemctl start crisis-detector
|
||||
```
|
||||
|
||||
## Offline Resource Cache
|
||||
|
||||
The file `data/crisis_resources.json` is bundled with the deployment.
|
||||
It contains:
|
||||
|
||||
- **988 Suicide & Crisis Lifeline** — call or text 988
|
||||
- **Crisis Text Line** — text HOME to 741741
|
||||
- **International Association for Suicide Prevention** — global directory
|
||||
- Cached local resources (customize per deployment location)
|
||||
|
||||
These display immediately when a crisis is detected — no network required.
|
||||
|
||||
## How It Works
|
||||
|
||||
```
|
||||
User input
|
||||
|
|
||||
v
|
||||
+-------------------+
|
||||
| Keyword Matcher | <- Tier 0: instant, no model
|
||||
| (regex/pattern) |
|
||||
+--------+----------+
|
||||
match? --yes--> Show crisis resources
|
||||
|
|
||||
no
|
||||
v
|
||||
+-------------------+
|
||||
| Falcon-H1-Tiny | <- Tier 1: ~200ms on Pi 4
|
||||
| (if available) |
|
||||
+--------+----------+
|
||||
crisis? --yes--> Show crisis resources
|
||||
|
|
||||
no
|
||||
v
|
||||
Continue normally
|
||||
```
|
||||
|
||||
## Testing Offline
|
||||
|
||||
```bash
|
||||
# Disconnect from internet
|
||||
sudo ip link set wlan0 down
|
||||
|
||||
# Run the test suite
|
||||
python3 scripts/crisis_detector.py --test
|
||||
|
||||
# Expected: all tests pass, resources display correctly
|
||||
|
||||
# Reconnect
|
||||
sudo ip link set wlan0 up
|
||||
```
|
||||
|
||||
## File Structure
|
||||
|
||||
```
|
||||
turboquant/
|
||||
+-- scripts/
|
||||
| +-- crisis_detector.py # Main detector (keyword + optional LLM)
|
||||
+-- data/
|
||||
| +-- crisis_resources.json # Offline resource cache
|
||||
+-- tests/
|
||||
| +-- test_edge_crisis.py # Offline verification tests
|
||||
+-- docs/
|
||||
+-- edge-crisis-deployment.md # This file
|
||||
```
|
||||
|
||||
## Customization
|
||||
|
||||
### Adding Local Resources
|
||||
|
||||
Edit `data/crisis_resources.json`:
|
||||
|
||||
```json
|
||||
{
|
||||
"local_resources": [
|
||||
{
|
||||
"name": "City Crisis Center",
|
||||
"phone": "555-0123",
|
||||
"address": "123 Main St",
|
||||
"hours": "24/7"
|
||||
}
|
||||
]
|
||||
}
|
||||
```
|
||||
|
||||
### Adjusting Sensitivity
|
||||
|
||||
In `scripts/crisis_detector.py`:
|
||||
|
||||
```python
|
||||
# Keyword threshold: how many keywords trigger a match
|
||||
KEYWORD_THRESHOLD = 1 # 1 = any keyword triggers (high recall)
|
||||
# 2 = need 2+ keywords (higher precision)
|
||||
|
||||
# LLM threshold (Tier 1/2): confidence score cutoff
|
||||
LLM_THRESHOLD = 0.6 # 0.6 = default (balanced)
|
||||
# 0.4 = more sensitive
|
||||
# 0.8 = more precise
|
||||
```
|
||||
|
||||
## Privacy
|
||||
|
||||
- **No data leaves the device.** All detection runs locally.
|
||||
- No logs of user input are stored by default.
|
||||
- Enable logging only for debugging (`--log` flag).
|
||||
- No network calls are made by the crisis detector.
|
||||
- Resource display is a local text render.
|
||||
|
||||
## License
|
||||
|
||||
Same as parent project. Crisis detection code and resource data are
|
||||
provided for humanitarian purposes.
|
||||
@@ -1,5 +1,29 @@
|
||||
"""Phase 19: Hardware-Aware Inference Optimization.
|
||||
Part of the TurboQuant suite for local inference excellence.
|
||||
"""Backward-compatible shim for hardware-aware quantization selection.
|
||||
|
||||
The original Phase 19 placeholder `hardware_optimizer.py` never shipped real
|
||||
logic. The canonical implementation now lives in `evolution.quant_selector`.
|
||||
This shim preserves the legacy import path for any downstream callers while
|
||||
making `quant_selector.py` the single source of truth.
|
||||
"""
|
||||
import logging
|
||||
# ... (rest of the code)
|
||||
|
||||
from evolution.quant_selector import ( # noqa: F401
|
||||
HardwareInfo,
|
||||
QuantLevel,
|
||||
QuantSelection,
|
||||
QUANT_LEVELS,
|
||||
detect_hardware,
|
||||
estimate_kv_cache_gb,
|
||||
estimate_model_memory_gb,
|
||||
select_quant_level,
|
||||
)
|
||||
|
||||
__all__ = [
|
||||
"HardwareInfo",
|
||||
"QuantLevel",
|
||||
"QuantSelection",
|
||||
"QUANT_LEVELS",
|
||||
"detect_hardware",
|
||||
"estimate_kv_cache_gb",
|
||||
"estimate_model_memory_gb",
|
||||
"select_quant_level",
|
||||
]
|
||||
|
||||
548
evolution/quant_selector.py
Normal file
548
evolution/quant_selector.py
Normal file
@@ -0,0 +1,548 @@
|
||||
"""Auto-select TurboQuant compression level based on available VRAM/RAM.
|
||||
|
||||
Detects hardware resources at startup and picks the highest quality
|
||||
quantization level that fits within available memory. Supports Apple
|
||||
Silicon unified memory, NVIDIA GPUs (via nvidia-smi), and CPU-only fallback.
|
||||
|
||||
Usage:
|
||||
from evolution.quant_selector import select_quant_level
|
||||
|
||||
selection = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
print(selection.level) # "turbo4"
|
||||
print(selection.reasoning) # "M4 Max 36GB unified: turbo4 fits 14.0GB model + ..."
|
||||
print(selection.env_vars) # {"TURBO_LAYER_ADAPTIVE": "7"}
|
||||
"""
|
||||
|
||||
import logging
|
||||
import os
|
||||
import platform
|
||||
import subprocess
|
||||
import sys
|
||||
from dataclasses import dataclass, field
|
||||
from pathlib import Path
|
||||
from typing import Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# ── Quant Level Definitions ───────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class QuantLevel:
|
||||
"""A TurboQuant compression level with its memory characteristics."""
|
||||
name: str # e.g. "turbo4"
|
||||
bits_per_channel: float # e.g. 3.5 for turbo4
|
||||
compression_ratio: float # vs uncompressed KV cache
|
||||
quality_label: str # "best", "high", "balanced", "fast"
|
||||
layer_adaptive: int # TURBO_LAYER_ADAPTIVE value (0-7)
|
||||
kv_type: str # -ctk/-ctv flag value
|
||||
min_memory_headroom_gb: float # Minimum free memory to recommend this level
|
||||
description: str = ""
|
||||
|
||||
|
||||
# Ordered from highest quality to most aggressive compression
|
||||
QUANT_LEVELS = [
|
||||
QuantLevel(
|
||||
name="turbo4",
|
||||
bits_per_channel=3.5,
|
||||
compression_ratio=4.2,
|
||||
quality_label="best",
|
||||
layer_adaptive=7,
|
||||
kv_type="turbo4",
|
||||
min_memory_headroom_gb=4.0,
|
||||
description="PolarQuant + QJL 4-bit. Best quality, ~4.2x KV compression."
|
||||
),
|
||||
QuantLevel(
|
||||
name="turbo3",
|
||||
bits_per_channel=2.5,
|
||||
compression_ratio=6.0,
|
||||
quality_label="high",
|
||||
layer_adaptive=5,
|
||||
kv_type="turbo3",
|
||||
min_memory_headroom_gb=3.0,
|
||||
description="3-bit TurboQuant. High quality, ~6x KV compression."
|
||||
),
|
||||
QuantLevel(
|
||||
name="turbo2",
|
||||
bits_per_channel=1.5,
|
||||
compression_ratio=10.0,
|
||||
quality_label="balanced",
|
||||
layer_adaptive=3,
|
||||
kv_type="turbo2",
|
||||
min_memory_headroom_gb=2.0,
|
||||
description="2-bit TurboQuant. Balanced, ~10x KV compression."
|
||||
),
|
||||
QuantLevel(
|
||||
name="q4_0",
|
||||
bits_per_channel=4.0,
|
||||
compression_ratio=3.5,
|
||||
quality_label="fast",
|
||||
layer_adaptive=0,
|
||||
kv_type="q4_0",
|
||||
min_memory_headroom_gb=1.5,
|
||||
description="Standard 4-bit quant. Fast fallback, no TurboQuant."
|
||||
),
|
||||
]
|
||||
|
||||
|
||||
# ── Hardware Detection ────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class HardwareInfo:
|
||||
"""Detected hardware resources."""
|
||||
total_memory_gb: float
|
||||
available_memory_gb: float
|
||||
gpu_memory_gb: Optional[float] = None
|
||||
gpu_name: Optional[str] = None
|
||||
is_apple_silicon: bool = False
|
||||
chip_name: Optional[str] = None
|
||||
cpu_cores: int = 0
|
||||
detection_method: str = ""
|
||||
|
||||
|
||||
def detect_hardware() -> HardwareInfo:
|
||||
"""Detect available memory and GPU resources."""
|
||||
system = platform.system()
|
||||
|
||||
if system == "Darwin":
|
||||
return _detect_apple_silicon()
|
||||
elif system == "Linux":
|
||||
return _detect_linux()
|
||||
else:
|
||||
return _detect_generic(system)
|
||||
|
||||
|
||||
def _detect_apple_silicon() -> HardwareInfo:
|
||||
"""Detect Apple Silicon unified memory."""
|
||||
info = HardwareInfo(
|
||||
total_memory_gb=0,
|
||||
available_memory_gb=0,
|
||||
is_apple_silicon=True,
|
||||
detection_method="sysctl",
|
||||
)
|
||||
|
||||
try:
|
||||
# Get total memory
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "hw.memsize"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
info.total_memory_gb = int(result.stdout.strip()) / (1024**3)
|
||||
|
||||
# Get chip name
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "machdep.cpu.brand_string"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
info.chip_name = result.stdout.strip()
|
||||
|
||||
# Try to get GPU name (Apple Silicon)
|
||||
result = subprocess.run(
|
||||
["system_profiler", "SPDisplaysDataType"],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
if result.returncode == 0:
|
||||
for line in result.stdout.split("\n"):
|
||||
if "Chipset" in line or "GPU" in line:
|
||||
info.gpu_name = line.split(":")[-1].strip()
|
||||
break
|
||||
|
||||
# Estimate available memory (vm_stat)
|
||||
result = subprocess.run(
|
||||
["vm_stat"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
page_size = 4096 # macOS default
|
||||
free_pages = 0
|
||||
for line in result.stdout.split("\n"):
|
||||
if "Pages free:" in line:
|
||||
try:
|
||||
free_pages = int(line.split(":")[-1].strip().rstrip("."))
|
||||
except ValueError:
|
||||
pass
|
||||
# Available ≈ free + some speculative (conservative: just free)
|
||||
info.available_memory_gb = (free_pages * page_size) / (1024**3)
|
||||
|
||||
# Fallback if vm_stat parsing failed
|
||||
if info.available_memory_gb < 1:
|
||||
# Conservative: 70% of total
|
||||
info.available_memory_gb = info.total_memory_gb * 0.70
|
||||
|
||||
# Apple Silicon shares memory — GPU memory = total memory
|
||||
info.gpu_memory_gb = info.total_memory_gb
|
||||
|
||||
# Detect CPU cores
|
||||
result = subprocess.run(
|
||||
["sysctl", "-n", "hw.ncpu"],
|
||||
capture_output=True, text=True, timeout=5
|
||||
)
|
||||
if result.returncode == 0:
|
||||
info.cpu_cores = int(result.stdout.strip())
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Apple Silicon detection failed: {e}")
|
||||
# Fallback
|
||||
info.total_memory_gb = 16.0
|
||||
info.available_memory_gb = 12.0
|
||||
info.detection_method = "fallback"
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def _detect_linux() -> HardwareInfo:
|
||||
"""Detect Linux system with optional NVIDIA GPU."""
|
||||
info = HardwareInfo(
|
||||
total_memory_gb=0,
|
||||
available_memory_gb=0,
|
||||
detection_method="proc",
|
||||
)
|
||||
|
||||
try:
|
||||
# Read /proc/meminfo
|
||||
with open("/proc/meminfo", "r") as f:
|
||||
meminfo = f.read()
|
||||
|
||||
for line in meminfo.split("\n"):
|
||||
if line.startswith("MemTotal:"):
|
||||
kb = int(line.split()[1])
|
||||
info.total_memory_gb = kb / (1024 * 1024)
|
||||
elif line.startswith("MemAvailable:"):
|
||||
kb = int(line.split()[1])
|
||||
info.available_memory_gb = kb / (1024 * 1024)
|
||||
|
||||
# CPU cores
|
||||
info.cpu_cores = os.cpu_count() or 1
|
||||
|
||||
# Check for NVIDIA GPU
|
||||
try:
|
||||
result = subprocess.run(
|
||||
["nvidia-smi", "--query-gpu=name,memory.total,memory.free",
|
||||
"--format=csv,noheader,nounits"],
|
||||
capture_output=True, text=True, timeout=10
|
||||
)
|
||||
if result.returncode == 0 and result.stdout.strip():
|
||||
lines = result.stdout.strip().split("\n")
|
||||
if lines:
|
||||
parts = lines[0].split(", ")
|
||||
if len(parts) >= 3:
|
||||
info.gpu_name = parts[0].strip()
|
||||
info.gpu_memory_gb = float(parts[1]) / 1024 # MB to GB
|
||||
gpu_free = float(parts[2]) / 1024
|
||||
# Use GPU free for VRAM-based selection
|
||||
info.available_memory_gb = max(info.available_memory_gb, gpu_free)
|
||||
info.detection_method = "nvidia-smi"
|
||||
except (FileNotFoundError, subprocess.TimeoutExpired):
|
||||
pass # No NVIDIA GPU
|
||||
|
||||
except Exception as e:
|
||||
logger.warning(f"Linux detection failed: {e}")
|
||||
info.total_memory_gb = 16.0
|
||||
info.available_memory_gb = 12.0
|
||||
info.detection_method = "fallback"
|
||||
|
||||
return info
|
||||
|
||||
|
||||
def _detect_generic(system: str) -> HardwareInfo:
|
||||
"""Fallback detection for unknown systems."""
|
||||
import psutil
|
||||
mem = psutil.virtual_memory()
|
||||
return HardwareInfo(
|
||||
total_memory_gb=mem.total / (1024**3),
|
||||
available_memory_gb=mem.available / (1024**3),
|
||||
cpu_cores=os.cpu_count() or 1,
|
||||
detection_method="psutil",
|
||||
)
|
||||
|
||||
|
||||
# ── KV Cache Memory Estimation ───────────────────────────────────────────────
|
||||
|
||||
def estimate_kv_cache_gb(
|
||||
context_length: int,
|
||||
num_layers: int = 48,
|
||||
num_kv_heads: int = 8,
|
||||
head_dim: int = 128,
|
||||
bits_per_channel: float = 3.5,
|
||||
) -> float:
|
||||
"""Estimate KV cache memory for given parameters.
|
||||
|
||||
Formula: 2 (K+V) × layers × kv_heads × head_dim × context_length × bits/8
|
||||
"""
|
||||
bytes_per_element = bits_per_channel / 8.0
|
||||
total_bytes = 2 * num_layers * num_kv_heads * head_dim * context_length * bytes_per_element
|
||||
return total_bytes / (1024**3)
|
||||
|
||||
|
||||
def estimate_model_memory_gb(model_size_gb: float, quant_type: str = "q4_k_m") -> float:
|
||||
"""Estimate model weights memory. Returns loaded size in GB.
|
||||
|
||||
This is a rough estimate — actual depends on exact quant format.
|
||||
"""
|
||||
# Common quant ratios (vs fp16)
|
||||
quant_multipliers = {
|
||||
"f16": 1.0,
|
||||
"q8_0": 0.5,
|
||||
"q6_k": 0.42,
|
||||
"q5_k_m": 0.37,
|
||||
"q4_k_m": 0.32,
|
||||
"q3_k_m": 0.27,
|
||||
"q2_k": 0.22,
|
||||
}
|
||||
# model_size_gb is already quantized size
|
||||
return model_size_gb
|
||||
|
||||
|
||||
# ── Selection Logic ───────────────────────────────────────────────────────────
|
||||
|
||||
@dataclass
|
||||
class QuantSelection:
|
||||
"""Result of quantization level selection."""
|
||||
level: QuantLevel
|
||||
hardware: HardwareInfo
|
||||
reasoning: str
|
||||
total_required_gb: float
|
||||
available_gb: float
|
||||
headroom_gb: float
|
||||
env_vars: dict = field(default_factory=dict)
|
||||
server_flags: dict = field(default_factory=dict)
|
||||
warnings: list = field(default_factory=list)
|
||||
|
||||
|
||||
def select_quant_level(
|
||||
model_size_gb: float = 14.0,
|
||||
context_length: int = 32768,
|
||||
num_layers: int = 48,
|
||||
num_kv_heads: int = 8,
|
||||
head_dim: int = 128,
|
||||
preferred_level: Optional[str] = None,
|
||||
force_cpu: bool = False,
|
||||
) -> QuantSelection:
|
||||
"""Select the best quantization level for available hardware.
|
||||
|
||||
Args:
|
||||
model_size_gb: Size of the model weights in GB
|
||||
context_length: Target context length
|
||||
num_layers: Number of transformer layers
|
||||
num_kv_heads: Number of KV attention heads
|
||||
head_dim: Dimension per attention head
|
||||
preferred_level: Force a specific level (still checks if it fits)
|
||||
force_cpu: If True, ignore GPU memory
|
||||
|
||||
Returns:
|
||||
QuantSelection with the chosen level and reasoning
|
||||
"""
|
||||
hw = detect_hardware()
|
||||
|
||||
if force_cpu:
|
||||
hw.gpu_memory_gb = None
|
||||
hw.gpu_name = None
|
||||
|
||||
# Use the most restrictive memory constraint
|
||||
# For Apple Silicon: unified memory, use total
|
||||
# For NVIDIA: use GPU VRAM
|
||||
# For CPU-only: use system RAM
|
||||
if hw.gpu_memory_gb and hw.gpu_name:
|
||||
memory_pool_gb = hw.gpu_memory_gb
|
||||
memory_label = f"{hw.gpu_name} {hw.gpu_memory_gb:.0f}GB VRAM"
|
||||
elif hw.is_apple_silicon:
|
||||
memory_pool_gb = hw.total_memory_gb
|
||||
memory_label = f"{hw.chip_name or 'Apple Silicon'} {hw.total_memory_gb:.0f}GB unified"
|
||||
else:
|
||||
memory_pool_gb = hw.total_memory_gb
|
||||
memory_label = f"{hw.cpu_cores}c CPU {hw.total_memory_gb:.0f}GB RAM"
|
||||
|
||||
model_mem = estimate_model_memory_gb(model_size_gb)
|
||||
|
||||
# Try levels from best to most compressed
|
||||
chosen = None
|
||||
for level in QUANT_LEVELS:
|
||||
if preferred_level and level.name != preferred_level:
|
||||
continue
|
||||
|
||||
kv_mem = estimate_kv_cache_gb(
|
||||
context_length, num_layers, num_kv_heads, head_dim,
|
||||
level.bits_per_channel
|
||||
)
|
||||
total_required = model_mem + kv_mem
|
||||
headroom = memory_pool_gb - total_required
|
||||
|
||||
if headroom >= level.min_memory_headroom_gb:
|
||||
chosen = level
|
||||
break
|
||||
|
||||
if preferred_level and level.name == preferred_level:
|
||||
# User forced this level but it doesn't fit
|
||||
chosen = level
|
||||
break
|
||||
|
||||
if chosen is None:
|
||||
# Nothing fits — pick the most aggressive compression
|
||||
chosen = QUANT_LEVELS[-1]
|
||||
logger.warning(f"No quant level fits in {memory_pool_gb:.1f}GB. Using {chosen.name}.")
|
||||
|
||||
# Calculate final numbers
|
||||
kv_mem = estimate_kv_cache_gb(
|
||||
context_length, num_layers, num_kv_heads, head_dim,
|
||||
chosen.bits_per_channel
|
||||
)
|
||||
total_required = model_mem + kv_mem
|
||||
headroom = memory_pool_gb - total_required
|
||||
|
||||
# Build reasoning
|
||||
reasoning_parts = [
|
||||
f"{memory_label}:",
|
||||
f"{chosen.name} ({chosen.quality_label}, {chosen.bits_per_channel:.1f}b/ch,",
|
||||
f"{chosen.compression_ratio:.1f}x compression)",
|
||||
f"fits {model_mem:.1f}GB model + {kv_mem:.1f}GB KV cache",
|
||||
f"@ {context_length}K context = {total_required:.1f}GB / {memory_pool_gb:.0f}GB",
|
||||
f"({headroom:.1f}GB headroom)"
|
||||
]
|
||||
reasoning = " ".join(reasoning_parts)
|
||||
|
||||
# Build environment variables for llama.cpp
|
||||
env_vars = {
|
||||
"TURBO_LAYER_ADAPTIVE": str(chosen.layer_adaptive),
|
||||
}
|
||||
|
||||
# Build server flags
|
||||
server_flags = {
|
||||
"-ctk": chosen.kv_type,
|
||||
"-ctv": chosen.kv_type,
|
||||
"-c": str(context_length),
|
||||
}
|
||||
|
||||
# Warnings
|
||||
warnings = []
|
||||
if headroom < 2.0:
|
||||
warnings.append(
|
||||
f"Low headroom ({headroom:.1f}GB). Consider reducing context length or model size."
|
||||
)
|
||||
if headroom < 0:
|
||||
warnings.append(
|
||||
f"OVERCOMMITTED: needs {total_required:.1f}GB but only {memory_pool_gb:.0f}GB available. "
|
||||
f"Inference may fail or swap heavily."
|
||||
)
|
||||
|
||||
selection = QuantSelection(
|
||||
level=chosen,
|
||||
hardware=hw,
|
||||
reasoning=reasoning,
|
||||
total_required_gb=total_required,
|
||||
available_gb=memory_pool_gb,
|
||||
headroom_gb=headroom,
|
||||
env_vars=env_vars,
|
||||
server_flags=server_flags,
|
||||
warnings=warnings,
|
||||
)
|
||||
|
||||
logger.info(f"Quant selection: {reasoning}")
|
||||
for w in warnings:
|
||||
logger.warning(w)
|
||||
|
||||
return selection
|
||||
|
||||
|
||||
# ── CLI ───────────────────────────────────────────────────────────────────────
|
||||
|
||||
def main():
|
||||
"""CLI entry point for quant level selection."""
|
||||
import argparse
|
||||
import json
|
||||
|
||||
parser = argparse.ArgumentParser(
|
||||
description="Auto-select TurboQuant compression level based on available hardware"
|
||||
)
|
||||
parser.add_argument("--model-size", type=float, default=14.0,
|
||||
help="Model size in GB (default: 14.0)")
|
||||
parser.add_argument("--context", type=int, default=32768,
|
||||
help="Target context length (default: 32768)")
|
||||
parser.add_argument("--layers", type=int, default=48,
|
||||
help="Number of transformer layers (default: 48)")
|
||||
parser.add_argument("--kv-heads", type=int, default=8,
|
||||
help="Number of KV attention heads (default: 8)")
|
||||
parser.add_argument("--head-dim", type=int, default=128,
|
||||
help="Dimension per attention head (default: 128)")
|
||||
parser.add_argument("--prefer", type=str, default=None,
|
||||
choices=[l.name for l in QUANT_LEVELS],
|
||||
help="Prefer a specific quant level")
|
||||
parser.add_argument("--force-cpu", action="store_true",
|
||||
help="Ignore GPU, use CPU memory only")
|
||||
parser.add_argument("--json", action="store_true",
|
||||
help="JSON output for automation")
|
||||
parser.add_argument("--detect-only", action="store_true",
|
||||
help="Only detect hardware, don't select")
|
||||
args = parser.parse_args()
|
||||
|
||||
logging.basicConfig(level=logging.INFO, format="%(message)s")
|
||||
|
||||
if args.detect_only:
|
||||
hw = detect_hardware()
|
||||
if args.json:
|
||||
print(json.dumps(hw.__dict__, default=str, indent=2))
|
||||
else:
|
||||
print(f"Total memory: {hw.total_memory_gb:.1f} GB")
|
||||
print(f"Available: {hw.available_memory_gb:.1f} GB")
|
||||
if hw.gpu_memory_gb:
|
||||
print(f"GPU memory: {hw.gpu_memory_gb:.1f} GB")
|
||||
if hw.gpu_name:
|
||||
print(f"GPU: {hw.gpu_name}")
|
||||
if hw.is_apple_silicon:
|
||||
print(f"Chip: {hw.chip_name or 'Apple Silicon'}")
|
||||
print(f"CPU cores: {hw.cpu_cores}")
|
||||
print(f"Detection: {hw.detection_method}")
|
||||
return
|
||||
|
||||
selection = select_quant_level(
|
||||
model_size_gb=args.model_size,
|
||||
context_length=args.context,
|
||||
num_layers=args.layers,
|
||||
num_kv_heads=args.kv_heads,
|
||||
head_dim=args.head_dim,
|
||||
preferred_level=args.prefer,
|
||||
force_cpu=args.force_cpu,
|
||||
)
|
||||
|
||||
if args.json:
|
||||
result = {
|
||||
"level": selection.level.name,
|
||||
"bits_per_channel": selection.level.bits_per_channel,
|
||||
"compression_ratio": selection.level.compression_ratio,
|
||||
"quality": selection.level.quality_label,
|
||||
"reasoning": selection.reasoning,
|
||||
"total_required_gb": round(selection.total_required_gb, 2),
|
||||
"available_gb": round(selection.available_gb, 1),
|
||||
"headroom_gb": round(selection.headroom_gb, 2),
|
||||
"env_vars": selection.env_vars,
|
||||
"server_flags": selection.server_flags,
|
||||
"warnings": selection.warnings,
|
||||
"hardware": {
|
||||
"total_memory_gb": round(selection.hardware.total_memory_gb, 1),
|
||||
"gpu_name": selection.hardware.gpu_name,
|
||||
"is_apple_silicon": selection.hardware.is_apple_silicon,
|
||||
"chip_name": selection.hardware.chip_name,
|
||||
"cpu_cores": selection.hardware.cpu_cores,
|
||||
},
|
||||
}
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print(f"Selected: {selection.level.name} ({selection.level.quality_label})")
|
||||
print(f" {selection.reasoning}")
|
||||
print()
|
||||
print(f"Environment variables:")
|
||||
for k, v in selection.env_vars.items():
|
||||
print(f" export {k}={v}")
|
||||
print()
|
||||
print(f"Server flags:")
|
||||
for k, v in selection.server_flags.items():
|
||||
print(f" {k} {v}")
|
||||
if selection.warnings:
|
||||
print()
|
||||
for w in selection.warnings:
|
||||
print(f" WARNING: {w}")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -1,386 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
crisis_detector.py — Edge crisis detection (keyword + optional LLM)
|
||||
|
||||
Zero-dependency crisis detector for offline edge devices.
|
||||
Tier 0: Keyword matching (instant, any device)
|
||||
Tier 1: Small LLM via llama.cpp (200ms on Pi 4, optional)
|
||||
|
||||
Usage:
|
||||
python3 crisis_detector.py # Interactive mode
|
||||
python3 crisis_detector.py --test # Run self-tests
|
||||
python3 crisis_detector.py --daemon # Headless daemon mode
|
||||
python3 crisis_detector.py --check "text" # Single check
|
||||
|
||||
Part of turboquant #99, #102.
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
from pathlib import Path
|
||||
|
||||
# ── Paths ───────────────────────────────────────────────────────────────────
|
||||
|
||||
SCRIPT_DIR = Path(__file__).resolve().parent
|
||||
DATA_DIR = SCRIPT_DIR.parent / "data"
|
||||
RESOURCES_FILE = DATA_DIR / "crisis_resources.json"
|
||||
|
||||
# ── Configuration ───────────────────────────────────────────────────────────
|
||||
|
||||
KEYWORD_THRESHOLD = 1 # Keywords needed to trigger (1 = high recall)
|
||||
LLM_THRESHOLD = 0.6 # LLM confidence cutoff
|
||||
LLM_TIMEOUT_MS = 5000 # Max LLM inference time
|
||||
LOG_FILE = None # Set by --log flag
|
||||
|
||||
# ── Tier 0: Keyword Detection ──────────────────────────────────────────────
|
||||
|
||||
# Crisis keywords organized by severity
|
||||
CRISIS_PATTERNS = {
|
||||
"immediate_danger": [
|
||||
r"\bkill(?:ing)?\s*(?:my)?self\b",
|
||||
r"\b(?:going|gonna|want(?:ing)?)\s*to\s*die\b",
|
||||
r"\b(?:end|ending)\s+(?:my|it|this)\s*life\b",
|
||||
r"\bsuicide\b",
|
||||
r"\bsuicidal\b",
|
||||
r"\bwanna\s*die\b",
|
||||
r"\bwant\s*to\s*die\b",
|
||||
r"\bcan'?t\s+(?:go\s+on|live|take\s+(?:it|this|anymore))\b",
|
||||
r"\bi'?m\s+(?:done|finished)\b.*\b(?:life|living)\b",
|
||||
r"\boverdose\b",
|
||||
r"\bhanging\s+(?:my)?self\b",
|
||||
r"\bjump(?:ing)?\s+(?:off|from)\b",
|
||||
r"\bslit(?:ting)?\s+(?:my\s+)?(?:wrists|throat)\b",
|
||||
r"\bshoot(?:ing)?\s+(?:my)?self\b",
|
||||
],
|
||||
"self_harm": [
|
||||
r"\bself[\s-]?harm(?:ing|ed)?\b",
|
||||
r"\bcut(?:ting)?\s+(?:my)?self\b",
|
||||
r"\bhurt(?:ing)?\s+(?:my)?self\b",
|
||||
r"\bpunish(?:ing)?\s+(?:my)?self\b",
|
||||
r"\bburn(?:ing)?\s+(?:my)?self\b",
|
||||
r"\bscar(?:ring)?\s+(?:my)?self\b",
|
||||
],
|
||||
"hopelessness": [
|
||||
r"\bhopeless\b",
|
||||
r"\bno\s+(?:point|reason|purpose)\b",
|
||||
r"\bwhy\s+(?:bother|try|am\s+i\s+here)\b",
|
||||
r"\bnobody\s+(?:cares|would\s+(?:miss|notice))\b",
|
||||
r"\bbeen\s+better\s+off\s+(?:dead|gone)\b",
|
||||
r"\bwouldn'?t\s+(?:miss|care)\b.*\b(?:if|when)\b.*\bdie\b",
|
||||
r"\bnothing\s+(?:matters|left)\b",
|
||||
r"\bgive\s+(?:up|me\s+death)\b",
|
||||
],
|
||||
"crisis_language": [
|
||||
r"\b(?:i|can'?t)\s+(?:handle|deal\s+with)\s+(?:this|it|anymore)\b",
|
||||
r"\btoo\s+much\s+(?:pain|suffering)\b",
|
||||
r"\bcan'?t\s+(?:take|stand)\s+(?:this|it|anymore)\b",
|
||||
r"\bbreak(?:ing|s)?\s+down\b",
|
||||
r"\b(?:i'?m|am)\s+(?:drowning|suffocating|dying)\b",
|
||||
r"\bsos\b",
|
||||
r"\bhelp\s+me\b.*\b(?:please|desperate)\b",
|
||||
r"\bemergency\b.*\b(?:mental|crisis)\b",
|
||||
r"\b(?:want|need|wish)(?:ing)?\s+(?:the|this|my)\s+pain\s+to\s+(?:stop|end|go\s+away)\b",
|
||||
r"\bmake\s+(?:the|this|my)\s+pain\s+(?:stop|end|go\s+away)\b",
|
||||
],
|
||||
}
|
||||
|
||||
# Compile all patterns
|
||||
_COMPILED_PATTERNS = {}
|
||||
for category, patterns in CRISIS_PATTERNS.items():
|
||||
_COMPILED_PATTERNS[category] = [re.compile(p, re.IGNORECASE) for p in patterns]
|
||||
|
||||
|
||||
def detect_keywords(text: str) -> dict:
|
||||
"""
|
||||
Tier 0 keyword detection. Returns match info.
|
||||
Result: {
|
||||
"detected": bool,
|
||||
"confidence": float (0-1),
|
||||
"categories": list[str],
|
||||
"matches": list[str]
|
||||
}
|
||||
"""
|
||||
matches = []
|
||||
categories = set()
|
||||
|
||||
for category, patterns in _COMPILED_PATTERNS.items():
|
||||
for pattern in patterns:
|
||||
m = pattern.search(text)
|
||||
if m:
|
||||
matches.append(m.group(0))
|
||||
categories.add(category)
|
||||
|
||||
detected = len(matches) >= KEYWORD_THRESHOLD
|
||||
|
||||
# Confidence heuristic
|
||||
if not detected:
|
||||
confidence = 0.0
|
||||
elif "immediate_danger" in categories:
|
||||
confidence = 0.95
|
||||
elif "self_harm" in categories:
|
||||
confidence = 0.85
|
||||
elif "hopelessness" in categories:
|
||||
confidence = 0.7
|
||||
else:
|
||||
confidence = 0.6
|
||||
|
||||
return {
|
||||
"detected": detected,
|
||||
"confidence": confidence,
|
||||
"categories": sorted(categories),
|
||||
"matches": matches,
|
||||
"method": "keyword",
|
||||
}
|
||||
|
||||
|
||||
# ── Tier 1: LLM Detection (Optional) ───────────────────────────────────────
|
||||
|
||||
def detect_llm(text: str, model_path: str = None) -> dict:
|
||||
"""
|
||||
Tier 1 LLM detection via llama.cpp subprocess.
|
||||
Falls back gracefully if model not available.
|
||||
"""
|
||||
if model_path is None:
|
||||
default_path = SCRIPT_DIR.parent / "models" / "falcon-h1-tiny-90m-q4km.gguf"
|
||||
model_path = str(default_path)
|
||||
|
||||
if not os.path.exists(model_path):
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.0,
|
||||
"method": "llm_unavailable",
|
||||
"reason": "Model not found at " + model_path,
|
||||
}
|
||||
|
||||
# Build llama-cli command
|
||||
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "llama-cli"
|
||||
if not llama_bin.exists():
|
||||
llama_bin = SCRIPT_DIR.parent / "llama.cpp" / "build" / "bin" / "llama-cli"
|
||||
|
||||
if not llama_bin.exists():
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.0,
|
||||
"method": "llm_unavailable",
|
||||
"reason": "llama-cli binary not found",
|
||||
}
|
||||
|
||||
prompt = (
|
||||
"You are a crisis detection system. Analyze the following text and "
|
||||
"determine if the person is in psychological crisis or expressing "
|
||||
"suicidal ideation.\n\n"
|
||||
f"Text: \"{text}\"\n\n"
|
||||
"Respond with ONLY one word: CRISIS or SAFE\n"
|
||||
)
|
||||
|
||||
try:
|
||||
import subprocess
|
||||
result = subprocess.run(
|
||||
[
|
||||
str(llama_bin),
|
||||
"-m", model_path,
|
||||
"-p", prompt,
|
||||
"-n", "5",
|
||||
"--temp", "0.1",
|
||||
"--no-display-prompt",
|
||||
],
|
||||
capture_output=True,
|
||||
text=True,
|
||||
timeout=LLM_TIMEOUT_MS / 1000,
|
||||
)
|
||||
output = result.stdout.strip().upper()
|
||||
|
||||
if "CRISIS" in output:
|
||||
return {
|
||||
"detected": True,
|
||||
"confidence": 0.8,
|
||||
"method": "llm",
|
||||
"raw_output": output[:100],
|
||||
}
|
||||
else:
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.9,
|
||||
"method": "llm",
|
||||
"raw_output": output[:100],
|
||||
}
|
||||
except Exception as e:
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.0,
|
||||
"method": "llm_error",
|
||||
"reason": str(e),
|
||||
}
|
||||
|
||||
|
||||
# ── Combined Detection ──────────────────────────────────────────────────────
|
||||
|
||||
def detect_crisis(text: str, use_llm: bool = True) -> dict:
|
||||
"""
|
||||
Full crisis detection pipeline: keyword first, then LLM if available.
|
||||
"""
|
||||
kw_result = detect_keywords(text)
|
||||
|
||||
if kw_result["detected"]:
|
||||
return kw_result
|
||||
|
||||
if use_llm:
|
||||
llm_result = detect_llm(text)
|
||||
if llm_result["detected"]:
|
||||
return llm_result
|
||||
|
||||
return {
|
||||
"detected": False,
|
||||
"confidence": 0.95,
|
||||
"categories": [],
|
||||
"matches": [],
|
||||
"method": "keyword+llm",
|
||||
}
|
||||
|
||||
|
||||
# ── Resource Display ────────────────────────────────────────────────────────
|
||||
|
||||
def load_resources() -> dict:
|
||||
"""Load offline crisis resources."""
|
||||
if RESOURCES_FILE.exists():
|
||||
with open(RESOURCES_FILE) as f:
|
||||
return json.load(f)
|
||||
return {
|
||||
"national_resources": [{
|
||||
"name": "988 Suicide & Crisis Lifeline",
|
||||
"phone": "988",
|
||||
"description": "Call or text 988 — free, confidential, 24/7",
|
||||
}],
|
||||
"local_resources": [],
|
||||
}
|
||||
|
||||
|
||||
def display_resources(result: dict) -> str:
|
||||
"""Format crisis resources for display."""
|
||||
resources = load_resources()
|
||||
lines = []
|
||||
lines.append("=" * 50)
|
||||
lines.append(" CRISIS RESOURCES — You are not alone")
|
||||
lines.append("=" * 50)
|
||||
lines.append("")
|
||||
|
||||
for r in resources.get("national_resources", []):
|
||||
lines.append(f" {r['name']}")
|
||||
lines.append(f" Phone: {r['phone']}")
|
||||
if r.get("description"):
|
||||
lines.append(f" {r['description']}")
|
||||
lines.append("")
|
||||
|
||||
for r in resources.get("local_resources", []):
|
||||
lines.append(f" {r['name']}")
|
||||
if r.get("phone"):
|
||||
lines.append(f" Phone: {r['phone']}")
|
||||
if r.get("address"):
|
||||
lines.append(f" Address: {r['address']}")
|
||||
if r.get("hours"):
|
||||
lines.append(f" Hours: {r['hours']}")
|
||||
lines.append("")
|
||||
|
||||
lines.append("-" * 50)
|
||||
lines.append(" Detection: " + result.get("method", "keyword"))
|
||||
lines.append(" Confidence: " + str(int(result.get("confidence", 0) * 100)) + "%")
|
||||
if result.get("categories"):
|
||||
lines.append(" Categories: " + ", ".join(result["categories"]))
|
||||
lines.append("=" * 50)
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
# ── CLI Interface ───────────────────────────────────────────────────────────
|
||||
|
||||
def run_tests():
|
||||
"""Run self-tests."""
|
||||
from tests.test_edge_crisis import run_all_tests
|
||||
return run_all_tests()
|
||||
|
||||
|
||||
def run_check(text: str):
|
||||
"""Single text check."""
|
||||
result = detect_crisis(text, use_llm=False)
|
||||
if result["detected"]:
|
||||
print(display_resources(result))
|
||||
else:
|
||||
print("SAFE — no crisis indicators detected")
|
||||
return result
|
||||
|
||||
|
||||
def run_interactive():
|
||||
"""Interactive mode — read lines from stdin, detect crisis."""
|
||||
resources = load_resources()
|
||||
print("Edge Crisis Detector (Ctrl+C to exit)")
|
||||
print("Type a message and press Enter to check.\n")
|
||||
|
||||
try:
|
||||
while True:
|
||||
try:
|
||||
text = input("> ").strip()
|
||||
except EOFError:
|
||||
break
|
||||
if not text:
|
||||
continue
|
||||
|
||||
result = detect_crisis(text, use_llm=False)
|
||||
if result["detected"]:
|
||||
print("\n" + display_resources(result) + "\n")
|
||||
else:
|
||||
print(" [safe]")
|
||||
except KeyboardInterrupt:
|
||||
print("\nExiting.")
|
||||
|
||||
|
||||
def run_daemon():
|
||||
"""Daemon mode — read from a named pipe or stdin, output results."""
|
||||
import select
|
||||
print("Edge Crisis Detector — daemon mode")
|
||||
print("Reading from stdin. Pipe text to detect.\n")
|
||||
|
||||
while True:
|
||||
try:
|
||||
line = sys.stdin.readline()
|
||||
if not line:
|
||||
break
|
||||
text = line.strip()
|
||||
if not text:
|
||||
continue
|
||||
|
||||
result = detect_crisis(text, use_llm=False)
|
||||
if result["detected"]:
|
||||
output = json.dumps({"crisis": True, "result": result, "resources": load_resources()})
|
||||
print(output, flush=True)
|
||||
else:
|
||||
print(json.dumps({"crisis": False}), flush=True)
|
||||
except KeyboardInterrupt:
|
||||
break
|
||||
|
||||
|
||||
def main():
|
||||
if "--test" in sys.argv:
|
||||
success = run_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
|
||||
elif "--check" in sys.argv:
|
||||
idx = sys.argv.index("--check")
|
||||
if idx + 1 < len(sys.argv):
|
||||
text = " ".join(sys.argv[idx + 1:])
|
||||
run_check(text)
|
||||
else:
|
||||
print("Usage: crisis_detector.py --check 'text to check'")
|
||||
sys.exit(1)
|
||||
|
||||
elif "--daemon" in sys.argv:
|
||||
run_daemon()
|
||||
|
||||
else:
|
||||
run_interactive()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
3
tests/conftest.py
Normal file
3
tests/conftest.py
Normal file
@@ -0,0 +1,3 @@
|
||||
"""Pytest configuration for turboquant."""
|
||||
import sys, os
|
||||
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
|
||||
@@ -1,209 +0,0 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
test_edge_crisis.py — Offline crisis detection tests
|
||||
|
||||
Tests run with zero dependencies and no network access.
|
||||
Verifies keyword detection accuracy and resource loading.
|
||||
|
||||
Usage:
|
||||
python3 tests/test_edge_crisis.py
|
||||
python3 -m pytest tests/test_edge_crisis.py -v
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Add parent to path for imports
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
|
||||
from scripts.crisis_detector import detect_keywords, detect_crisis, load_resources
|
||||
|
||||
# ── Test Data ───────────────────────────────────────────────────────────────
|
||||
|
||||
# Texts that SHOULD trigger crisis detection
|
||||
CRISIS_TEXTS = [
|
||||
"I want to kill myself",
|
||||
"I'm going to die tonight",
|
||||
"I can't take this anymore, I want to end it",
|
||||
"Nobody would miss me if I were gone",
|
||||
"I've been cutting myself to feel something",
|
||||
"There's no point in living anymore",
|
||||
"I'm suicidal and I need help",
|
||||
"I just want the pain to stop forever",
|
||||
"I've been thinking about hanging myself",
|
||||
"Please help me I'm drowning in this",
|
||||
"I give up. Nothing matters anymore.",
|
||||
"I want to jump off the bridge",
|
||||
"I can't go on like this",
|
||||
"Help me I'm desperate",
|
||||
"I'm done with life",
|
||||
]
|
||||
|
||||
# Texts that should NOT trigger (common false positives)
|
||||
SAFE_TEXTS = [
|
||||
"What a kill in that game!",
|
||||
"I'm dying of laughter",
|
||||
"This movie killed me",
|
||||
"I want to kill it at my presentation",
|
||||
"I'm dead tired after work",
|
||||
"The jump scare got me",
|
||||
"I cut my finger cooking",
|
||||
"This game is killing me (hard level)",
|
||||
"The point of the story is unclear",
|
||||
"I'm done with this homework",
|
||||
"No point in arguing about it",
|
||||
"The bridge looks beautiful at sunset",
|
||||
"I gave up sugar for Lent",
|
||||
"The knife is in the drawer",
|
||||
"She's hanging the picture on the wall",
|
||||
]
|
||||
|
||||
|
||||
def test_crisis_texts_detected():
|
||||
"""All crisis texts must be detected."""
|
||||
failures = []
|
||||
for text in CRISIS_TEXTS:
|
||||
result = detect_keywords(text)
|
||||
if not result["detected"]:
|
||||
failures.append(f"MISSED: '{text}'")
|
||||
assert not failures, "Crisis texts not detected:\n" + "\n".join(failures)
|
||||
|
||||
|
||||
def test_safe_texts_not_detected():
|
||||
"""Safe texts must not trigger false positives."""
|
||||
failures = []
|
||||
for text in SAFE_TEXTS:
|
||||
result = detect_keywords(text)
|
||||
if result["detected"]:
|
||||
failures.append(f"FALSE POSITIVE: '{text}' -> {result['categories']}")
|
||||
# Allow some false positives — crisis detection should err on side of caution
|
||||
false_positive_rate = len(failures) / len(SAFE_TEXTS)
|
||||
assert false_positive_rate < 0.3, (
|
||||
f"False positive rate too high ({false_positive_rate:.0%}):\n"
|
||||
+ "\n".join(failures)
|
||||
)
|
||||
|
||||
|
||||
def test_immediate_danger_high_confidence():
|
||||
"""Immediate danger keywords must have high confidence."""
|
||||
for text in ["I want to kill myself", "I'm going to die tonight", "I'm suicidal"]:
|
||||
result = detect_keywords(text)
|
||||
assert result["detected"], f"Failed to detect: '{text}'"
|
||||
assert result["confidence"] >= 0.8, (
|
||||
f"Low confidence for immediate danger: {result['confidence']}"
|
||||
)
|
||||
|
||||
|
||||
def test_resources_load():
|
||||
"""Resources file must exist and be valid."""
|
||||
resources = load_resources()
|
||||
assert "national_resources" in resources
|
||||
assert len(resources["national_resources"]) >= 1
|
||||
|
||||
# 988 lifeline must be present
|
||||
phones = [r.get("phone", "") for r in resources["national_resources"]]
|
||||
assert any("988" in p for p in phones), "988 Lifeline not in resources"
|
||||
|
||||
|
||||
def test_resources_have_required_fields():
|
||||
"""All national resources must have name and contact method."""
|
||||
resources = load_resources()
|
||||
for r in resources["national_resources"]:
|
||||
assert "name" in r, f"Resource missing name: {r}"
|
||||
has_contact = r.get("phone") or r.get("text") or r.get("url")
|
||||
assert has_contact, f"Resource missing contact: {r['name']}"
|
||||
|
||||
|
||||
def test_keyword_categories():
|
||||
"""Verify all keyword categories are represented."""
|
||||
for text, expected_cats in [
|
||||
("I want to kill myself", ["immediate_danger"]),
|
||||
("I've been cutting myself", ["self_harm"]),
|
||||
("There's no point in living", ["hopelessness"]),
|
||||
]:
|
||||
result = detect_keywords(text)
|
||||
assert result["detected"], f"Should detect: '{text}'"
|
||||
for cat in expected_cats:
|
||||
assert cat in result["categories"], (
|
||||
f"Expected category '{cat}' for '{text}', got {result['categories']}"
|
||||
)
|
||||
|
||||
|
||||
def test_empty_text_safe():
|
||||
"""Empty text must not trigger."""
|
||||
result = detect_keywords("")
|
||||
assert not result["detected"]
|
||||
assert result["confidence"] == 0.0
|
||||
|
||||
|
||||
def test_detect_crisis_combined():
|
||||
"""Combined detect_crisis function works (keyword-only, no LLM)."""
|
||||
result = detect_crisis("I want to kill myself", use_llm=False)
|
||||
assert result["detected"]
|
||||
|
||||
result2 = detect_crisis("Nice weather today", use_llm=False)
|
||||
assert not result2["detected"]
|
||||
|
||||
|
||||
def test_resource_file_exists():
|
||||
"""The resources JSON file must exist."""
|
||||
resources_file = Path(__file__).resolve().parent.parent / "data" / "crisis_resources.json"
|
||||
assert resources_file.exists(), f"Missing: {resources_file}"
|
||||
|
||||
|
||||
def test_resources_json_valid():
|
||||
"""Resources file must be valid JSON with expected structure."""
|
||||
resources_file = Path(__file__).resolve().parent.parent / "data" / "crisis_resources.json"
|
||||
with open(resources_file) as f:
|
||||
data = json.load(f)
|
||||
assert "version" in data
|
||||
assert "national_resources" in data
|
||||
assert "self_help_prompts" in data
|
||||
assert len(data["national_resources"]) >= 3
|
||||
|
||||
|
||||
# ── Runner ──────────────────────────────────────────────────────────────────
|
||||
|
||||
def run_all_tests():
|
||||
"""Run all tests without pytest."""
|
||||
tests = [
|
||||
test_crisis_texts_detected,
|
||||
test_safe_texts_not_detected,
|
||||
test_immediate_danger_high_confidence,
|
||||
test_resources_load,
|
||||
test_resources_have_required_fields,
|
||||
test_keyword_categories,
|
||||
test_empty_text_safe,
|
||||
test_detect_crisis_combined,
|
||||
test_resource_file_exists,
|
||||
test_resources_json_valid,
|
||||
]
|
||||
|
||||
passed = 0
|
||||
failed = 0
|
||||
|
||||
for test in tests:
|
||||
name = test.__name__
|
||||
try:
|
||||
test()
|
||||
print(f" PASS: {name}")
|
||||
passed += 1
|
||||
except AssertionError as e:
|
||||
print(f" FAIL: {name}")
|
||||
print(f" {e}")
|
||||
failed += 1
|
||||
except Exception as e:
|
||||
print(f" ERROR: {name}: {e}")
|
||||
failed += 1
|
||||
|
||||
print(f"\n{'='*50}")
|
||||
print(f"Results: {passed} passed, {failed} failed, {passed+failed} total")
|
||||
print(f"{'='*50}")
|
||||
|
||||
return failed == 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
success = run_all_tests()
|
||||
sys.exit(0 if success else 1)
|
||||
21
tests/test_hardware_optimizer.py
Normal file
21
tests/test_hardware_optimizer.py
Normal file
@@ -0,0 +1,21 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for hardware_optimizer compatibility shim."""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
|
||||
from evolution import hardware_optimizer, quant_selector
|
||||
|
||||
|
||||
def test_hardware_optimizer_reexports_quant_selector_api():
|
||||
assert hardware_optimizer.select_quant_level is quant_selector.select_quant_level
|
||||
assert hardware_optimizer.detect_hardware is quant_selector.detect_hardware
|
||||
assert hardware_optimizer.HardwareInfo is quant_selector.HardwareInfo
|
||||
assert hardware_optimizer.QuantSelection is quant_selector.QuantSelection
|
||||
|
||||
|
||||
def test_hardware_optimizer_exports_quant_level_definitions():
|
||||
assert hardware_optimizer.QUANT_LEVELS is quant_selector.QUANT_LEVELS
|
||||
assert hardware_optimizer.QuantLevel is quant_selector.QuantLevel
|
||||
74
tests/test_markdown_link_check.py
Normal file
74
tests/test_markdown_link_check.py
Normal file
@@ -0,0 +1,74 @@
|
||||
import textwrap
|
||||
from pathlib import Path
|
||||
|
||||
from check_markdown_links import find_broken_links
|
||||
|
||||
|
||||
def write(path: Path, content: str) -> None:
|
||||
path.parent.mkdir(parents=True, exist_ok=True)
|
||||
path.write_text(textwrap.dedent(content).lstrip(), encoding="utf-8")
|
||||
|
||||
|
||||
def test_reports_missing_local_markdown_target_with_line_number(tmp_path: Path):
|
||||
write(
|
||||
tmp_path / "README.md",
|
||||
"""
|
||||
# Repo
|
||||
|
||||
See [status](docs/status.md).
|
||||
""",
|
||||
)
|
||||
|
||||
broken = find_broken_links(tmp_path)
|
||||
|
||||
assert len(broken) == 1
|
||||
assert broken[0]["source"].endswith("README.md")
|
||||
assert broken[0]["line"] == 3
|
||||
assert broken[0]["target"] == "docs/status.md"
|
||||
|
||||
|
||||
def test_allows_existing_relative_targets(tmp_path: Path):
|
||||
write(tmp_path / "docs" / "status.md", "# Status\n")
|
||||
write(
|
||||
tmp_path / "README.md",
|
||||
"""
|
||||
# Repo
|
||||
|
||||
See [status](docs/status.md).
|
||||
""",
|
||||
)
|
||||
|
||||
assert find_broken_links(tmp_path) == []
|
||||
|
||||
|
||||
def test_ignores_external_anchor_mailto_and_tel_links(tmp_path: Path):
|
||||
write(
|
||||
tmp_path / "README.md",
|
||||
"""
|
||||
[external](https://example.com)
|
||||
[anchor](#section)
|
||||
[mail](mailto:test@example.com)
|
||||
[call](tel:988)
|
||||
""",
|
||||
)
|
||||
|
||||
assert find_broken_links(tmp_path) == []
|
||||
|
||||
|
||||
def test_ignores_links_inside_fenced_code_blocks(tmp_path: Path):
|
||||
write(
|
||||
tmp_path / "README.md",
|
||||
"""
|
||||
```md
|
||||
[broken](docs/missing.md)
|
||||
```
|
||||
""",
|
||||
)
|
||||
|
||||
assert find_broken_links(tmp_path) == []
|
||||
|
||||
|
||||
def test_skips_build_directories(tmp_path: Path):
|
||||
write(tmp_path / "build" / "README.md", "[broken](missing.md)\n")
|
||||
|
||||
assert find_broken_links(tmp_path) == []
|
||||
189
tests/test_quant_selector.py
Normal file
189
tests/test_quant_selector.py
Normal file
@@ -0,0 +1,189 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for quant_selector.py"""
|
||||
|
||||
import sys
|
||||
import os
|
||||
import pytest
|
||||
from unittest.mock import patch, MagicMock
|
||||
|
||||
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
|
||||
from evolution.quant_selector import (
|
||||
QuantLevel,
|
||||
HardwareInfo,
|
||||
QUANT_LEVELS,
|
||||
detect_hardware,
|
||||
estimate_kv_cache_gb,
|
||||
estimate_model_memory_gb,
|
||||
select_quant_level,
|
||||
)
|
||||
|
||||
|
||||
class TestQuantLevels:
|
||||
def test_levels_ordered_by_quality(self):
|
||||
"""TurboQuant levels should be ordered from best quality to most aggressive.
|
||||
|
||||
The quality ordering invariant for TurboQuant levels is monotonically
|
||||
increasing compression_ratio (more aggressive = more compression).
|
||||
Non-TurboQuant fallbacks (e.g. q4_0) are placed after all TurboQuant
|
||||
levels and may have any compression ratio — they exist as safe defaults,
|
||||
not as part of the quality progression.
|
||||
"""
|
||||
turbo_quant_names = {"turbo4", "turbo3", "turbo2"}
|
||||
turbo_levels = [l for l in QUANT_LEVELS if l.name in turbo_quant_names]
|
||||
for i in range(len(turbo_levels) - 1):
|
||||
assert turbo_levels[i].compression_ratio <= turbo_levels[i + 1].compression_ratio, (
|
||||
f"TurboQuant {turbo_levels[i].name} (compression={turbo_levels[i].compression_ratio}x) "
|
||||
f"should have <= compression than {turbo_levels[i+1].name} "
|
||||
f"(compression={turbo_levels[i+1].compression_ratio}x)"
|
||||
)
|
||||
|
||||
def test_fallback_quant_is_last(self):
|
||||
"""Non-TurboQuant fallbacks (e.g. q4_0) should be at the end of the list."""
|
||||
turbo_quant_names = {"turbo4", "turbo3", "turbo2"}
|
||||
found_fallback = False
|
||||
for level in QUANT_LEVELS:
|
||||
if level.name not in turbo_quant_names:
|
||||
found_fallback = True
|
||||
elif found_fallback:
|
||||
pytest.fail(
|
||||
f"TurboQuant level '{level.name}' appears after a fallback level. "
|
||||
f"All TurboQuant levels must precede fallbacks."
|
||||
)
|
||||
|
||||
def test_all_levels_have_required_fields(self):
|
||||
for level in QUANT_LEVELS:
|
||||
assert level.name
|
||||
assert level.bits_per_channel > 0
|
||||
assert level.compression_ratio > 1
|
||||
assert level.quality_label
|
||||
assert level.layer_adaptive >= 0
|
||||
assert level.kv_type
|
||||
|
||||
|
||||
class TestKVEstimate:
|
||||
def test_basic_estimate(self):
|
||||
# 48 layers, 8 heads, 128 dim, 32K context, 3.5 bits
|
||||
kv_gb = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
|
||||
assert kv_gb > 0
|
||||
assert kv_gb < 10 # Should be reasonable
|
||||
|
||||
def test_longer_context_larger(self):
|
||||
kv_32k = estimate_kv_cache_gb(32768, 48, 8, 128, 3.5)
|
||||
kv_128k = estimate_kv_cache_gb(131072, 48, 8, 128, 3.5)
|
||||
assert kv_128k > kv_32k
|
||||
|
||||
def test_higher_bits_larger(self):
|
||||
kv_4b = estimate_kv_cache_gb(32768, 48, 8, 128, 4.0)
|
||||
kv_2b = estimate_kv_cache_gb(32768, 48, 8, 128, 2.0)
|
||||
assert kv_4b > kv_2b
|
||||
|
||||
|
||||
class TestHardwareDetection:
|
||||
def test_detect_returns_info(self):
|
||||
hw = detect_hardware()
|
||||
assert hw.total_memory_gb > 0
|
||||
assert hw.available_memory_gb > 0
|
||||
assert hw.detection_method
|
||||
|
||||
@patch("evolution.quant_selector.platform.system", return_value="Linux")
|
||||
@patch("builtins.open", create=True)
|
||||
def test_linux_detection(self, mock_open, mock_system):
|
||||
mock_open.return_value.__enter__().read.return_value = (
|
||||
"MemTotal: 32000000 kB\n"
|
||||
"MemAvailable: 24000000 kB\n"
|
||||
)
|
||||
hw = _detect_linux_fallback()
|
||||
assert hw.total_memory_gb > 20
|
||||
|
||||
|
||||
def _detect_linux_fallback():
|
||||
"""Helper to test Linux detection with mocked /proc/meminfo."""
|
||||
from evolution.quant_selector import _detect_linux
|
||||
return _detect_linux()
|
||||
|
||||
|
||||
class TestSelection:
|
||||
def test_selects_turbo4_for_large_memory(self):
|
||||
"""With plenty of memory, should pick turbo4 (best quality)."""
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=64,
|
||||
available_memory_gb=48,
|
||||
gpu_memory_gb=64,
|
||||
gpu_name="Test GPU",
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
assert sel.level.name == "turbo4"
|
||||
assert sel.headroom_gb > 0
|
||||
|
||||
def test_selects_smaller_for_tight_memory(self):
|
||||
"""With tight memory, should pick a smaller quant."""
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=16,
|
||||
available_memory_gb=12,
|
||||
gpu_memory_gb=16,
|
||||
gpu_name="Test GPU",
|
||||
cpu_cores=8,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=131072)
|
||||
# Should pick a smaller quant for 128K context on 16GB
|
||||
assert sel.level.bits_per_channel <= 4.0
|
||||
|
||||
def test_preferred_level(self):
|
||||
"""User can force a specific level."""
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=64,
|
||||
available_memory_gb=48,
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(
|
||||
model_size_gb=14.0, context_length=32768,
|
||||
preferred_level="turbo2"
|
||||
)
|
||||
assert sel.level.name == "turbo2"
|
||||
|
||||
def test_env_vars_populated(self):
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=64,
|
||||
available_memory_gb=48,
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
assert "TURBO_LAYER_ADAPTIVE" in sel.env_vars
|
||||
assert "-ctk" in sel.server_flags
|
||||
assert "-ctv" in sel.server_flags
|
||||
|
||||
def test_warnings_on_low_headroom(self):
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=18,
|
||||
available_memory_gb=14,
|
||||
gpu_memory_gb=18,
|
||||
gpu_name="Test GPU",
|
||||
cpu_cores=8,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=16.0, context_length=65536)
|
||||
assert len(sel.warnings) > 0
|
||||
|
||||
def test_reasoning_contains_key_info(self):
|
||||
with patch("evolution.quant_selector.detect_hardware") as mock_hw:
|
||||
mock_hw.return_value = HardwareInfo(
|
||||
total_memory_gb=32,
|
||||
available_memory_gb=24,
|
||||
is_apple_silicon=True,
|
||||
chip_name="M4 Max",
|
||||
cpu_cores=16,
|
||||
detection_method="mock",
|
||||
)
|
||||
sel = select_quant_level(model_size_gb=14.0, context_length=32768)
|
||||
assert "turbo4" in sel.reasoning
|
||||
assert "M4 Max" in sel.reasoning or "32GB" in sel.reasoning
|
||||
83
tests/test_smoke_workflow.py
Normal file
83
tests/test_smoke_workflow.py
Normal file
@@ -0,0 +1,83 @@
|
||||
"""Tests for smoke workflow CI configuration.
|
||||
|
||||
Validates that the GitHub Actions / Gitea Actions smoke workflow
|
||||
actually runs the standalone CMake build and test suite, not just
|
||||
parse checks.
|
||||
"""
|
||||
|
||||
from pathlib import Path
|
||||
|
||||
import yaml
|
||||
|
||||
import pytest
|
||||
|
||||
|
||||
WORKFLOW_PATH = Path(".gitea/workflows/smoke.yml")
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def workflow():
|
||||
"""Load and parse the smoke workflow YAML."""
|
||||
content = WORKFLOW_PATH.read_text(encoding="utf-8")
|
||||
return yaml.safe_load(content)
|
||||
|
||||
|
||||
def test_smoke_workflow_exists():
|
||||
"""Smoke workflow file must exist."""
|
||||
assert WORKFLOW_PATH.exists(), f"Missing {WORKFLOW_PATH}"
|
||||
|
||||
|
||||
def test_smoke_has_cmake_configure_step(workflow):
|
||||
"""Smoke workflow must configure the CMake project with tests enabled."""
|
||||
steps = workflow["jobs"]["smoke"]["steps"]
|
||||
cmake_found = False
|
||||
for step in steps:
|
||||
run = step.get("run", "")
|
||||
if "cmake -S . -B build" in run and "TURBOQUANT_BUILD_TESTS=ON" in run:
|
||||
cmake_found = True
|
||||
break
|
||||
assert cmake_found, (
|
||||
"Smoke workflow missing cmake configure step with TURBOQUANT_BUILD_TESTS=ON"
|
||||
)
|
||||
|
||||
|
||||
def test_smoke_has_cmake_build_step(workflow):
|
||||
"""Smoke workflow must build the CMake project."""
|
||||
steps = workflow["jobs"]["smoke"]["steps"]
|
||||
build_found = False
|
||||
for step in steps:
|
||||
run = step.get("run", "")
|
||||
if "cmake --build build" in run:
|
||||
build_found = True
|
||||
break
|
||||
assert build_found, "Smoke workflow missing cmake --build step"
|
||||
|
||||
|
||||
def test_smoke_has_ctest_step(workflow):
|
||||
"""Smoke workflow must run ctest."""
|
||||
steps = workflow["jobs"]["smoke"]["steps"]
|
||||
ctest_found = False
|
||||
for step in steps:
|
||||
run = step.get("run", "")
|
||||
if "ctest" in run and "output-on-failure" in run:
|
||||
ctest_found = True
|
||||
break
|
||||
assert ctest_found, "Smoke workflow missing ctest --output-on-failure step"
|
||||
|
||||
|
||||
def test_smoke_build_before_secret_scan(workflow):
|
||||
"""Build and test steps must run before secret scan (fail fast on build errors)."""
|
||||
steps = workflow["jobs"]["smoke"]["steps"]
|
||||
names = [s.get("name", "") for s in steps]
|
||||
build_idx = None
|
||||
scan_idx = None
|
||||
for i, name in enumerate(names):
|
||||
if "cmake" in name.lower() or "build" in name.lower():
|
||||
if build_idx is None:
|
||||
build_idx = i
|
||||
if "secret" in name.lower():
|
||||
scan_idx = i
|
||||
if build_idx is not None and scan_idx is not None:
|
||||
assert build_idx < scan_idx, (
|
||||
"Build step should run before secret scan to fail fast on broken code"
|
||||
)
|
||||
338
tests/test_tool_call_integration.py
Normal file
338
tests/test_tool_call_integration.py
Normal file
@@ -0,0 +1,338 @@
|
||||
"""
|
||||
Integration test: turboquant compressed model passes hermes tool calls (issue #82).
|
||||
|
||||
Validates that a TurboQuant-compressed model can:
|
||||
1. Parse hermes tool schemas correctly
|
||||
2. Format tool calls in OpenAI-compatible format
|
||||
3. Pass through the hermes agent conversation loop
|
||||
|
||||
Tests are structured as contract tests -- they validate the schema/format
|
||||
compatibility without requiring a running model server. The live inference
|
||||
test is skipped by default (requires llama-server with TurboQuant model).
|
||||
|
||||
Usage:
|
||||
pytest tests/test_tool_call_integration.py -v
|
||||
pytest tests/test_tool_call_integration.py -v -k live # run live test if server available
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import unittest
|
||||
|
||||
import pytest
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||
PROFILE_PATH = ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml"
|
||||
BENCHMARKS_DIR = ROOT / "benchmarks"
|
||||
|
||||
|
||||
class TestHermesProfileSchema(unittest.TestCase):
|
||||
"""Validate the hermes profile YAML has required fields for tool calling."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
import yaml
|
||||
cls.profile = yaml.safe_load(PROFILE_PATH.read_text())
|
||||
|
||||
def test_profile_has_providers(self):
|
||||
assert "providers" in self.profile, "Profile must define providers"
|
||||
assert "primary" in self.profile["providers"], "Must have primary provider"
|
||||
|
||||
def test_primary_provider_has_endpoint(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "endpoint" in primary, "Primary provider must have endpoint"
|
||||
assert primary["endpoint"].startswith("http"), "Endpoint must be HTTP(S) URL"
|
||||
|
||||
def test_primary_provider_has_api_path(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "api_path" in primary, "Primary provider must have api_path"
|
||||
assert "/chat/completions" in primary["api_path"], (
|
||||
"api_path should be OpenAI-compatible /chat/completions"
|
||||
)
|
||||
|
||||
def test_turboquant_settings_present(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "turboquant" in primary, "Must have turboquant config section"
|
||||
tq = primary["turboquant"]
|
||||
assert tq.get("enabled") is True, "TurboQuant must be enabled"
|
||||
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4"), (
|
||||
"kv_type must be turbo2, turbo3, or turbo4"
|
||||
)
|
||||
|
||||
def test_context_window_configured(self):
|
||||
primary = self.profile["providers"]["primary"]
|
||||
assert "context" in primary, "Must have context config"
|
||||
ctx = primary["context"]
|
||||
assert ctx.get("max_tokens", 0) >= 8192, (
|
||||
"max_tokens should be >= 8192 for TurboQuant value proposition"
|
||||
)
|
||||
|
||||
|
||||
class TestToolSchemaCompatibility(unittest.TestCase):
|
||||
"""Verify hermes tool schemas serialize to valid JSON for OpenAI tool_calls."""
|
||||
|
||||
SAMPLE_TOOL_SCHEMAS = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a text file with line numbers.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string", "description": "File path"},
|
||||
"offset": {"type": "integer", "default": 1},
|
||||
"limit": {"type": "integer", "default": 500},
|
||||
},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "execute_code",
|
||||
"description": "Run a Python script.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"code": {"type": "string", "description": "Python code"},
|
||||
},
|
||||
"required": ["code"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web.",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"query": {"type": "string"},
|
||||
"max_results": {"type": "integer", "default": 5},
|
||||
},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
|
||||
def test_tool_schemas_serialize_to_json(self):
|
||||
"""Tool schemas must serialize without errors."""
|
||||
serialized = json.dumps(self.SAMPLE_TOOL_SCHEMAS)
|
||||
assert len(serialized) > 0
|
||||
parsed = json.loads(serialized)
|
||||
assert len(parsed) == len(self.SAMPLE_TOOL_SCHEMAS)
|
||||
|
||||
def test_tool_schemas_have_required_openai_fields(self):
|
||||
"""Each tool schema must have the fields OpenAI expects."""
|
||||
for tool in self.SAMPLE_TOOL_SCHEMAS:
|
||||
assert tool["type"] == "function", "Tool type must be 'function'"
|
||||
fn = tool["function"]
|
||||
assert "name" in fn, "Function must have name"
|
||||
assert "description" in fn, "Function must have description"
|
||||
assert "parameters" in fn, "Function must have parameters"
|
||||
params = fn["parameters"]
|
||||
assert params["type"] == "object", "Parameters type must be 'object'"
|
||||
assert "properties" in params, "Parameters must have properties"
|
||||
|
||||
def test_tool_call_response_format(self):
|
||||
"""Verify tool_call response matches OpenAI format."""
|
||||
tool_call = {
|
||||
"id": "call_abc123",
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"arguments": json.dumps({"path": "/tmp/test.txt"}),
|
||||
},
|
||||
}
|
||||
args = json.loads(tool_call["function"]["arguments"])
|
||||
assert args["path"] == "/tmp/test.txt"
|
||||
assert tool_call["function"]["name"] in [
|
||||
t["function"]["name"] for t in self.SAMPLE_TOOL_SCHEMAS
|
||||
]
|
||||
|
||||
def test_tool_names_are_valid_identifiers(self):
|
||||
"""Tool names must be valid Python identifiers for hermes dispatch."""
|
||||
for tool in self.SAMPLE_TOOL_SCHEMAS:
|
||||
name = tool["function"]["name"]
|
||||
assert re.match(r"^[a-zA-Z_][a-zA-Z0-9_]*$", name), (
|
||||
f"Tool name \'{name}\' is not a valid identifier"
|
||||
)
|
||||
|
||||
|
||||
class TestTurboquantServerConfig(unittest.TestCase):
|
||||
"""Validate server startup configuration matches hermes profile."""
|
||||
|
||||
def test_server_command_has_turboquant_flags(self):
|
||||
"""The server command in the profile must include -ctk/-ctv flags."""
|
||||
profile_text = PROFILE_PATH.read_text()
|
||||
assert "-ctk" in profile_text, "Profile server command must include -ctk flag"
|
||||
assert "-ctv" in profile_text, "Profile server command must include -ctv flag"
|
||||
|
||||
def test_server_command_has_context_flag(self):
|
||||
"""Server command must set context size."""
|
||||
profile_text = PROFILE_PATH.read_text()
|
||||
assert re.search(r"-c\s+\d+", profile_text), (
|
||||
"Server command must include -c <context_size> flag"
|
||||
)
|
||||
|
||||
def test_layer_adaptive_env_var(self):
|
||||
"""Profile must set TURBO_LAYER_ADAPTIVE env var."""
|
||||
profile_text = PROFILE_PATH.read_text()
|
||||
assert "TURBO_LAYER_ADAPTIVE" in profile_text, (
|
||||
"Profile must configure TURBO_LAYER_ADAPTIVE"
|
||||
)
|
||||
|
||||
|
||||
class TestBenchmarkData(unittest.TestCase):
|
||||
"""Validate benchmark test prompts include tool-call test cases."""
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
prompts_path = BENCHMARKS_DIR / "test_prompts.json"
|
||||
cls.prompts = json.loads(prompts_path.read_text())
|
||||
|
||||
def test_has_tool_call_test_prompt(self):
|
||||
"""Benchmark prompts must include a tool-call format test."""
|
||||
categories = [p.get("category") for p in self.prompts]
|
||||
assert "tool_call_format" in categories, (
|
||||
"Benchmark must include a tool_call_format test case"
|
||||
)
|
||||
|
||||
def test_tool_call_prompt_expects_json(self):
|
||||
"""Tool call test prompt must expect JSON in the response."""
|
||||
tool_prompt = next(
|
||||
p for p in self.prompts if p.get("category") == "tool_call_format"
|
||||
)
|
||||
pattern = tool_prompt.get("expected_pattern", "")
|
||||
assert "json" in pattern.lower() or "\\{" in pattern, (
|
||||
"Tool call prompt must expect JSON-formatted response"
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not os.environ.get("TURBOQUANT_SERVER_URL"),
|
||||
reason="No TurboQuant server available (set TURBOQUANT_SERVER_URL to run)",
|
||||
)
|
||||
class TestLiveToolCallIntegration:
|
||||
"""Live integration test -- requires running llama-server with TurboQuant."""
|
||||
|
||||
def test_server_health(self):
|
||||
"""Server must respond to /v1/models endpoint."""
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
resp = requests.get(f"{url}/v1/models", timeout=10)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "data" in data
|
||||
assert len(data["data"]) > 0
|
||||
|
||||
def test_tool_call_completion(self):
|
||||
"""Model must return a valid tool_call for a read_file prompt."""
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"path": {"type": "string"}},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
}
|
||||
]
|
||||
resp = requests.post(
|
||||
f"{url}/v1/chat/completions",
|
||||
json={
|
||||
"model": "gemma-4",
|
||||
"messages": [
|
||||
{"role": "user", "content": "Read the file at /tmp/test.txt"}
|
||||
],
|
||||
"tools": tools,
|
||||
"tool_choice": "auto",
|
||||
},
|
||||
timeout=120,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
choice = data["choices"][0]
|
||||
msg = choice["message"]
|
||||
if "tool_calls" in msg and msg["tool_calls"]:
|
||||
tc = msg["tool_calls"][0]
|
||||
assert tc["type"] == "function"
|
||||
assert tc["function"]["name"] == "read_file"
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert "path" in args
|
||||
else:
|
||||
assert len(msg.get("content", "")) > 0
|
||||
|
||||
def test_tool_call_with_multiple_tools(self):
|
||||
"""Model must handle multiple available tools."""
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
tools = [
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "read_file",
|
||||
"description": "Read a file",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"path": {"type": "string"}},
|
||||
"required": ["path"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "web_search",
|
||||
"description": "Search the web",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"query": {"type": "string"}},
|
||||
"required": ["query"],
|
||||
},
|
||||
},
|
||||
},
|
||||
{
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": "execute_code",
|
||||
"description": "Run Python code",
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {"code": {"type": "string"}},
|
||||
"required": ["code"],
|
||||
},
|
||||
},
|
||||
},
|
||||
]
|
||||
resp = requests.post(
|
||||
f"{url}/v1/chat/completions",
|
||||
json={
|
||||
"model": "gemma-4",
|
||||
"messages": [
|
||||
{"role": "user", "content": "Search the web for 'bitcoin price'"}
|
||||
],
|
||||
"tools": tools,
|
||||
"tool_choice": "auto",
|
||||
},
|
||||
timeout=120,
|
||||
)
|
||||
assert resp.status_code == 200
|
||||
data = resp.json()
|
||||
assert "choices" in data
|
||||
assert len(data["choices"]) > 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
225
tests/test_tool_call_regression.py
Normal file
225
tests/test_tool_call_regression.py
Normal file
@@ -0,0 +1,225 @@
|
||||
"""
|
||||
TurboQuant Compressed Model Tool Call Regression Suite — Issue #96
|
||||
|
||||
Run: pytest tests/tool_call_regression.py -v
|
||||
Generate matrix: pytest tests/tool_call_regression.py --generate-matrix
|
||||
"""
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import re
|
||||
import time
|
||||
import unittest
|
||||
from typing import Dict
|
||||
|
||||
import pytest
|
||||
|
||||
ROOT = pathlib.Path(__file__).resolve().parents[1]
|
||||
BENCHMARKS_DIR = ROOT / "benchmarks"
|
||||
RESULTS_MATRIX = BENCHMARKS_DIR / "tool-call-regression.md"
|
||||
|
||||
CORE_TOOLS = [
|
||||
{"name": "read_file", "description": "Read a text file", "args": {"path": "/tmp/test.txt"}},
|
||||
{"name": "web_search", "description": "Search the web", "args": {"query": "turboquant"}},
|
||||
{"name": "terminal", "description": "Run a shell command", "args": {"command": "echo ok"}},
|
||||
{"name": "execute_code", "description": "Run Python code", "args": {"code": "print(1)"}},
|
||||
{"name": "delegate_task", "description": "Delegate to subagent", "args": {"goal": "test"}},
|
||||
]
|
||||
|
||||
PARALLEL_TOOLS = [
|
||||
{"name": "read_file", "args": {"path": "/tmp/a.txt"}},
|
||||
{"name": "web_search", "args": {"query": "python"}},
|
||||
{"name": "execute_code", "args": {"code": "x=1"}},
|
||||
]
|
||||
|
||||
PASS_THRESHOLD = 0.95
|
||||
|
||||
|
||||
class TestToolSchemaContract(unittest.TestCase):
|
||||
def test_core_tool_schemas_are_valid_functions(self):
|
||||
for tool in CORE_TOOLS:
|
||||
schema = {
|
||||
"type": "function",
|
||||
"function": {
|
||||
"name": tool["name"],
|
||||
"description": tool["description"],
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {},
|
||||
"required": list(tool["args"].keys()),
|
||||
},
|
||||
},
|
||||
}
|
||||
parsed = json.loads(json.dumps(schema))
|
||||
assert parsed["type"] == "function"
|
||||
fn = parsed["function"]
|
||||
assert fn["name"] == tool["name"]
|
||||
assert fn["description"]
|
||||
assert "parameters" in fn
|
||||
|
||||
def test_parallel_tool_set_is_unique(self):
|
||||
names = [t["name"] for t in PARALLEL_TOOLS]
|
||||
assert len(names) == len(set(names))
|
||||
|
||||
def test_tool_call_response_format(self):
|
||||
tc = {"id": "call_abc", "type": "function",
|
||||
"function": {"name": "read_file", "arguments": json.dumps({"path": "/tmp/test.txt"})}}
|
||||
assert tc["type"] == "function"
|
||||
args = json.loads(tc["function"]["arguments"])
|
||||
assert "path" in args
|
||||
|
||||
def test_parallel_response_contains_multiple_calls(self):
|
||||
calls = [
|
||||
{"id": "c1", "type": "function", "function": {"name": "read_file", "arguments": "{}"}},
|
||||
{"id": "c2", "type": "function", "function": {"name": "web_search", "arguments": "{}"}},
|
||||
{"id": "c3", "type": "function", "function": {"name": "execute_code","arguments": "{}"}},
|
||||
]
|
||||
assert len(calls) >= 3
|
||||
call_names = {c["function"]["name"] for c in calls}
|
||||
assert len(call_names) >= 2
|
||||
|
||||
|
||||
class TestProfileConfig(unittest.TestCase):
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
import yaml
|
||||
cls.profile = yaml.safe_load((ROOT / "profiles" / "hermes-profile-gemma4-turboquant.yaml").read_text())
|
||||
|
||||
def test_primary_provider_has_all_required_fields(self):
|
||||
"""Provider must have model, endpoint, and turboquant config."""
|
||||
p = self.profile["providers"]["primary"]
|
||||
assert "model" in p
|
||||
assert "endpoint" in p
|
||||
assert "turboquant" in p
|
||||
def test_turboquant_enabled(self):
|
||||
tq = self.profile["providers"]["primary"].get("turboquant", {})
|
||||
assert tq.get("enabled") is True
|
||||
assert tq.get("kv_type") in ("turbo2", "turbo3", "turbo4")
|
||||
|
||||
def test_server_command_has_turboquant_flags(self):
|
||||
cmd = self.profile["providers"]["primary"].get("server_command", "")
|
||||
assert "-ctk" in cmd and "-ctv" in cmd
|
||||
|
||||
|
||||
@pytest.mark.skipif(
|
||||
not os.environ.get("TURBOQUANT_SERVER_URL"),
|
||||
reason="Set TURBOQUANT_SERVER_URL to run live regression"
|
||||
)
|
||||
class TestLiveRegression:
|
||||
RESULTS: Dict[str, bool] = {}
|
||||
|
||||
def _call_model(self, tools, prompt, timeout=120):
|
||||
import requests
|
||||
url = os.environ["TURBOQUANT_SERVER_URL"]
|
||||
resp = requests.post(
|
||||
f"{url}/v1/chat/completions",
|
||||
json={"model": "gemma-4", "messages": [{"role": "user", "content": prompt}],
|
||||
"tools": tools, "tool_choice": "auto"},
|
||||
timeout=timeout,
|
||||
)
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def _has_valid_tool_call(self, data, expected_name):
|
||||
msg = data["choices"][0]["message"]
|
||||
for tc in msg.get("tool_calls", []):
|
||||
if tc["function"]["name"] == expected_name:
|
||||
json.loads(tc["function"]["arguments"])
|
||||
return True
|
||||
return False
|
||||
|
||||
def test_read_file(self):
|
||||
tools = [{"type":"function","function":{"name":"read_file","description":"Read file",
|
||||
"parameters":{"type":"object","properties":{"path":{"type":"string"}},"required":["path"]}}}]
|
||||
data = self._call_model(tools, "Read /tmp/test.txt")
|
||||
self.__class__.RESULTS["read_file"] = self._has_valid_tool_call(data, "read_file")
|
||||
|
||||
def test_web_search(self):
|
||||
tools = [{"type":"function","function":{"name":"web_search","description":"Search",
|
||||
"parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}}}]
|
||||
data = self._call_model(tools, "Search for Python")
|
||||
self.__class__.RESULTS["web_search"] = self._has_valid_tool_call(data, "web_search")
|
||||
|
||||
def test_terminal(self):
|
||||
tools = [{"type":"function","function":{"name":"terminal","description":"Shell",
|
||||
"parameters":{"type":"object","properties":{"command":{"type":"string"}},"required":["command"]}}}]
|
||||
data = self._call_model(tools, "List files")
|
||||
self.__class__.RESULTS["terminal"] = self._has_valid_tool_call(data, "terminal")
|
||||
|
||||
def test_execute_code(self):
|
||||
tools = [{"type":"function","function":{"name":"execute_code","description":"Code",
|
||||
"parameters":{"type":"object","properties":{"code":{"type":"string"}},"required":["code"]}}}]
|
||||
data = self._call_model(tools, "Run: print('test')")
|
||||
self.__class__.RESULTS["execute_code"] = self._has_valid_tool_call(data, "execute_code")
|
||||
|
||||
def test_delegate_task(self):
|
||||
tools = [{"type":"function","function":{"name":"delegate_task","description":"Delegate",
|
||||
"parameters":{"type":"object","properties":{"goal":{"type":"string"}},"required":["goal"]}}}]
|
||||
data = self._call_model(tools, "Delegate task: test")
|
||||
self.__class__.RESULTS["delegate_task"] = self._has_valid_tool_call(data, "delegate_task")
|
||||
|
||||
def test_parallel_tool_calling(self):
|
||||
tools = [
|
||||
{"type":"function","function":{"name":"read_file","description":"Read",
|
||||
"parameters":{"type":"object","properties":{"path":{"type":"string"}},"required":["path"]}},},
|
||||
{"type":"function","function":{"name":"web_search","description":"Search",
|
||||
"parameters":{"type":"object","properties":{"query":{"type":"string"}},"required":["query"]}},},
|
||||
{"type":"function","function":{"name":"execute_code","description":"Code",
|
||||
"parameters":{"type":"object","properties":{"code":{"type":"string"}},"required":["code"]}},},
|
||||
]
|
||||
data = self._call_model(tools, "Read a.txt, search python, run code")
|
||||
msg = data["choices"][0]["message"]
|
||||
calls = msg.get("tool_calls", [])
|
||||
names = {c["function"]["name"] for c in calls}
|
||||
self.__class__.RESULTS["parallel"] = len(names) >= 2
|
||||
|
||||
@classmethod
|
||||
def _accuracy(cls) -> float:
|
||||
if not cls.RESULTS:
|
||||
return 1.0
|
||||
return sum(1 for v in cls.RESULTS.values() if v) / len(cls.RESULTS)
|
||||
|
||||
@classmethod
|
||||
def teardown_class(cls):
|
||||
acc = cls._accuracy()
|
||||
print(f"\nTool Call Regression Accuracy: {acc*100:.1f}% (threshold {PASS_THRESHOLD*100:.0f}%)")
|
||||
for name, passed in cls.RESULTS.items():
|
||||
print(f" {name}: {'PASS' if passed else 'FAIL'}")
|
||||
assert acc >= PASS_THRESHOLD, f"Accuracy {acc*100:.1f}% below {PASS_THRESHOLD*100:.0f}% gate"
|
||||
if os.environ.get("GENERATE_MATRIX"):
|
||||
_append_matrix(acc, cls.RESULTS)
|
||||
|
||||
|
||||
def _append_matrix(accuracy: float, results: Dict[str, bool]):
|
||||
timestamp = time.strftime("%Y-%m-%d %H:%M UTC", time.gmtime())
|
||||
tool_names = [t["name"] for t in CORE_TOOLS]
|
||||
tool_checks = ["✓" if results.get(n, False) else "✗" for n in tool_names]
|
||||
parallel_check = "✓" if results.get("parallel") else "✗"
|
||||
row = f"| {timestamp} | gemma-4 | turbo4 | {accuracy*100:.1f}% | " + " | ".join(tool_checks) + f" | {parallel_check} |\n"
|
||||
header = (
|
||||
"| Timestamp | Model | Preset | Accuracy | "
|
||||
+ " | ".join(tool_names)
|
||||
+ " | Parallel |\n"
|
||||
"|-----------|-------|--------|----------|"
|
||||
+ "---|" * (len(tool_names) + 1) + "\n"
|
||||
)
|
||||
if not RESULTS_MATRIX.exists():
|
||||
RESULTS_MATRIX.write_text(header + row)
|
||||
else:
|
||||
content = RESULTS_MATRIX.read_text()
|
||||
if header not in content:
|
||||
content = header + row + content
|
||||
else:
|
||||
content = header + row + content.split(header, 1)[1]
|
||||
RESULTS_MATRIX.write_text(content)
|
||||
print(f"Matrix updated: {RESULTS_MATRIX}")
|
||||
|
||||
|
||||
def pytest_addoption(parser):
|
||||
parser.addoption("--generate-matrix", action="store_true",
|
||||
help="Update benchmarks/tool-call-regression.md with live results")
|
||||
|
||||
|
||||
def pytest_configure(config):
|
||||
if config.getoption("--generate-matrix"):
|
||||
os.environ["GENERATE_MATRIX"] = "1"
|
||||
Reference in New Issue
Block a user