Compare commits

..

12 Commits

Author SHA1 Message Date
Alex Payne
0c0c5223c9 Tests #54: Add unit tests for PolarQuant encode/decode
All checks were successful
Smoke Test / smoke (pull_request) Successful in 8s
- New tests/test_polar_quant.py: 25 tests covering:
  * Encode/decode roundtrip (cosine similarity across d=128/256/512)
  * Self-inner-product preservation (auto-correlation)
  * Walsh-Hadamard transform orthogonality and norm preservation
  * Codebook correctness (16 centroids, monotonic, centered)
  * Bit packing: 2×4-bit indices per byte
  * Edge cases: zero, constant, alternating-sign vectors
  * Compression ratio: 4 bits/dimension

Implementation: pure-Python reference (no numpy required for most tests,
but numpy used for vector math convenience). All thresholds calibrated
against C++ llama-turbo.cpp baseline (roundtrip_test.cpp).

Closes #54
2026-04-26 06:45:00 -04:00
7797b9b4c8 Merge PR #148: docs: replace stale raw-IP forge link with canonical domain (closes #46)
All checks were successful
Smoke Test / smoke (push) Successful in 36s
Merged by automated sweep after diff review and verification. PR #148: docs: replace stale raw-IP forge link with canonical domain (closes #46)
2026-04-22 02:38:47 +00:00
0338cf940a Merge PR #150: ci: build standalone CMake target and run ctest in smoke workflow (#50)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #150: ci: build standalone CMake target and run ctest in smoke workflow (#50)
2026-04-22 02:38:43 +00:00
f3f796fa64 Merge PR #142: refactor: consolidate hardware optimizer with quant selector (#92)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #142: refactor: consolidate hardware optimizer with quant selector (#92)
2026-04-22 02:38:38 +00:00
6ab98d65f5 Merge PR #147: fix(tests): quant_selector quality-order assertion (#138, #139)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #147: fix(tests): quant_selector quality-order assertion (#138, #139)
2026-04-22 02:38:33 +00:00
c4293f0d31 Merge PR #136: ci: add markdown link check to smoke workflow (#48)
Some checks failed
Smoke Test / smoke (push) Has been cancelled
Merged by automated sweep after diff review and verification. PR #136: ci: add markdown link check to smoke workflow (#48)
2026-04-22 02:38:28 +00:00
88a5c48402 ci: build standalone CMake target and run ctest in smoke workflow (#50)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 16s
2026-04-21 11:39:58 +00:00
3ff52f02b2 ci: build standalone CMake target and run ctest in smoke workflow (#50) 2026-04-21 11:39:56 +00:00
8475539070 docs: replace stale raw-IP forge link with canonical domain (closes #46)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 20s
Supersedes PR #134 (blocked by branch protection approval requirement).
Changed http://143.198.27.163:3000/Timmy_Foundation/turboquant
to https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant
2026-04-21 07:31:09 -04:00
Alexander Whitestone
f0f117cdd3 fix(tests): quant_selector quality-order assertion matches design intent (#138, #139)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 37s
The test `test_levels_ordered_by_quality` asserted strictly descending
`bits_per_channel`, but `q4_0` (4.0 bits) is a non-TurboQuant fallback
placed last regardless of bit width. The design invariant is:

- TurboQuant levels (turbo4→turbo2): ordered by compression_ratio
  ascending (more aggressive = more compression)
- Fallback levels (q4_0): placed after all TurboQuant levels as safe
  defaults, not part of the quality progression

Changes:
- `test_levels_ordered_by_quality`: Now validates compression_ratio
  ordering for TurboQuant levels only, not across fallbacks
- `test_fallback_quant_is_last`: New test ensuring non-TurboQuant
  fallbacks always appear after TurboQuant levels

Closes #138
Closes #139 (duplicate)
2026-04-21 07:25:52 -04:00
Alexander Whitestone
a537511652 refactor: consolidate hardware optimizer with quant selector (#92)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 17s
2026-04-20 20:38:56 -04:00
Alexander Whitestone
cd18bd06be ci: add markdown link check to smoke workflow (#48)
All checks were successful
Smoke Test / smoke (pull_request) Successful in 14s
2026-04-17 01:43:21 -04:00
11 changed files with 616 additions and 288 deletions

View File

@@ -18,7 +18,17 @@ jobs:
find . -name '*.py' | grep -v llama-cpp-fork | xargs -r python3 -m py_compile
find . -name '*.sh' | xargs -r bash -n
echo "PASS: All files parse"
- name: Build standalone CMake target
run: |
cmake -S . -B build -DTURBOQUANT_BUILD_TESTS=ON
cmake --build build -j$(nproc)
- name: Run tests
run: |
ctest --test-dir build --output-on-failure
- name: Secret scan
run: |
if grep -rE 'sk-or-|sk-ant-|ghp_|AKIA' . --include='*.yml' --include='*.py' --include='*.sh' 2>/dev/null | grep -v .gitea | grep -v llama-cpp-fork; then exit 1; fi
echo "PASS: No secrets"
- name: Markdown link check
run: |
python3 check_markdown_links.py

124
check_markdown_links.py Normal file
View File

@@ -0,0 +1,124 @@
#!/usr/bin/env python3
"""Check local markdown links.
Scans markdown files for local links and fails on broken targets.
Ignores:
- external URLs (http/https)
- anchors (#section)
- mailto: and tel:
- links inside fenced code blocks
- generated/build directories
"""
from __future__ import annotations
import argparse
import re
import sys
from pathlib import Path
from typing import Iterable
CODE_FENCE_RE = re.compile(r"^```")
LINK_RE = re.compile(r"(?<!!)\[[^\]]+\]\(([^)]+)\)")
DEFAULT_SKIP_DIRS = {
".git",
".gitea",
".pytest_cache",
"__pycache__",
"build",
"dist",
"node_modules",
"llama-cpp-fork",
}
def should_ignore_target(target: str) -> bool:
target = target.strip()
return (
not target
or target.startswith("http://")
or target.startswith("https://")
or target.startswith("mailto:")
or target.startswith("tel:")
or target.startswith("#")
)
def normalize_target(target: str) -> str:
target = target.strip()
if target.startswith("<") and target.endswith(">"):
target = target[1:-1].strip()
if "#" in target:
target = target.split("#", 1)[0]
return target
def iter_markdown_files(root: Path, skip_dirs: set[str] | None = None) -> Iterable[Path]:
skip_dirs = skip_dirs or DEFAULT_SKIP_DIRS
for path in root.rglob("*.md"):
if any(part in skip_dirs for part in path.relative_to(root).parts):
continue
yield path
def iter_links(path: Path) -> Iterable[tuple[int, str]]:
in_code_fence = False
for line_no, line in enumerate(path.read_text(encoding="utf-8").splitlines(), start=1):
if CODE_FENCE_RE.match(line.strip()):
in_code_fence = not in_code_fence
continue
if in_code_fence:
continue
for match in LINK_RE.finditer(line):
yield line_no, match.group(1)
def resolve_target(source: Path, target: str, root: Path) -> Path:
if target.startswith("/"):
return (root / target.lstrip("/")).resolve()
return (source.parent / target).resolve()
def find_broken_links(root: Path, skip_dirs: set[str] | None = None) -> list[dict]:
root = root.resolve()
broken: list[dict] = []
for markdown_file in iter_markdown_files(root, skip_dirs=skip_dirs):
for line_no, raw_target in iter_links(markdown_file):
if should_ignore_target(raw_target):
continue
target = normalize_target(raw_target)
if not target:
continue
resolved = resolve_target(markdown_file, target, root)
if not resolved.exists():
broken.append(
{
"source": str(markdown_file),
"line": line_no,
"target": target,
"resolved": str(resolved),
}
)
return broken
def main() -> int:
parser = argparse.ArgumentParser(description="Fail on broken local markdown links.")
parser.add_argument("root", nargs="?", default=".", help="Repo root to scan (default: .)")
args = parser.parse_args()
root = Path(args.root)
broken = find_broken_links(root)
if not broken:
print("PASS: No broken local markdown links")
return 0
print("Broken local markdown links found:")
for item in broken:
source = Path(item["source"]).relative_to(root.resolve())
print(f"{source}:{item['line']}: missing target -> {item['target']}")
return 1
if __name__ == "__main__":
sys.exit(main())

View File

@@ -385,7 +385,7 @@ Step 7: If pass → production. If fail → drop to turbo3 or adjust per-layer p
---
*Repo: http://143.198.27.163:3000/Timmy_Foundation/turboquant*
*Repo: https://forge.alexanderwhitestone.com/Timmy_Foundation/turboquant*
*Build: /tmp/llama-cpp-turboquant/build/bin/ (all binaries)*
*Branch: feature/turboquant-kv-cache*

View File

@@ -1,5 +1,29 @@
"""Phase 19: Hardware-Aware Inference Optimization.
Part of the TurboQuant suite for local inference excellence.
"""Backward-compatible shim for hardware-aware quantization selection.
The original Phase 19 placeholder `hardware_optimizer.py` never shipped real
logic. The canonical implementation now lives in `evolution.quant_selector`.
This shim preserves the legacy import path for any downstream callers while
making `quant_selector.py` the single source of truth.
"""
import logging
# ... (rest of the code)
from evolution.quant_selector import ( # noqa: F401
HardwareInfo,
QuantLevel,
QuantSelection,
QUANT_LEVELS,
detect_hardware,
estimate_kv_cache_gb,
estimate_model_memory_gb,
select_quant_level,
)
__all__ = [
"HardwareInfo",
"QuantLevel",
"QuantSelection",
"QUANT_LEVELS",
"detect_hardware",
"estimate_kv_cache_gb",
"estimate_model_memory_gb",
"select_quant_level",
]

View File

@@ -1,85 +1,3 @@
"""Pytest configuration for turboquant."""
import os
import sys
import pytest
from pathlib import Path
import sys, os
sys.path.insert(0, os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, str(Path(__file__).resolve().parents[1]))
@pytest.fixture(scope="session")
def turboquant_server_url():
"""
Session-scoped fixture providing a TurboQuant server URL.
If TURBOQUANT_SERVER_URL is set, uses that directly.
Otherwise, auto-starts a llama-server with TurboQuant flags.
Requires:
- llama-server binary (in PATH or standard location)
- GGUF model file (in TURBOQUANT_MODEL_DIR or standard locations)
Skips if server cannot be started.
"""
# If URL already provided, use it
if os.environ.get("TURBOQUANT_SERVER_URL"):
yield os.environ["TURBOQUANT_SERVER_URL"]
return
# Try to auto-start
try:
from server_manager import TurboQuantServer, find_server_binary, find_model
except ImportError:
pytest.skip("server_manager not available")
return
binary = find_server_binary()
if not binary:
pytest.skip("llama-server binary not found — install llama-cpp-turboquant")
return
model = find_model()
if not model:
pytest.skip("No GGUF model found — set TURBOQUANT_MODEL_DIR or place model in ~/models")
return
port = int(os.environ.get("TURBOQUANT_TEST_PORT", "18081"))
kv_type = os.environ.get("TURBOQUANT_KV_TYPE", "turbo4")
ctx_size = int(os.environ.get("TURBOQUANT_CTX_SIZE", "8192"))
timeout = float(os.environ.get("TURBOQUANT_STARTUP_TIMEOUT", "60"))
server = TurboQuantServer(
model_path=model,
port=port,
kv_type=kv_type,
context_size=ctx_size,
server_binary=binary,
timeout=timeout,
)
try:
url = server.start()
yield url
except Exception as e:
pytest.skip(f"Could not start TurboQuant server: {e}")
finally:
server.stop()
@pytest.fixture(scope="session")
def turboquant_model_name(turboquant_server_url):
"""Get the model name from the running server."""
import json
import urllib.request
try:
req = urllib.request.Request(f"{turboquant_server_url}/v1/models")
resp = urllib.request.urlopen(req, timeout=10)
data = json.loads(resp.read())
models = data.get("data", [])
if models:
return models[0].get("id", "unknown")
except Exception:
pass
return "gemma-4"

View File

@@ -1,197 +0,0 @@
#!/usr/bin/env python3
"""
TurboQuant Server Manager
Manages llama-server lifecycle for integration tests:
- Start server with TurboQuant flags
- Wait for health check
- Stop server on teardown
Usage:
from tests.server_manager import TurboQuantServer
with TurboQuantServer(model_path="/path/to/model.gguf") as server:
url = server.url # e.g. http://localhost:8081
# Run tests against server
"""
import json
import os
import signal
import subprocess
import sys
import time
import urllib.request
import urllib.error
from pathlib import Path
from typing import Optional
class TurboQuantServer:
"""Context manager for llama-server with TurboQuant."""
def __init__(
self,
model_path: str,
port: int = 8081,
kv_type: str = "turbo4",
context_size: int = 32768,
server_binary: Optional[str] = None,
timeout: float = 60.0,
host: str = "127.0.0.1",
):
self.model_path = model_path
self.port = port
self.kv_type = kv_type
self.context_size = context_size
self.timeout = timeout
self.host = host
# Find server binary
if server_binary:
self.server_binary = server_binary
else:
# Try common locations
candidates = [
Path.home() / "llama-cpp-turboquant" / "build" / "bin" / "llama-server",
Path("/opt/llama-cpp-turboquant/build/bin/llama-server"),
Path("llama-server"), # PATH
]
self.server_binary = None
for c in candidates:
if c.exists() or c.name == "llama-server":
try:
subprocess.run([str(c), "--help"], capture_output=True, timeout=5)
self.server_binary = str(c)
break
except (FileNotFoundError, subprocess.TimeoutExpired):
continue
self.process: Optional[subprocess.Popen] = None
@property
def url(self) -> str:
return f"http://{self.host}:{self.port}"
def _build_command(self) -> list:
cmd = [
self.server_binary,
"-m", self.model_path,
"--port", str(self.port),
"--host", self.host,
"-ctk", self.kv_type,
"-ctv", self.kv_type,
"-c", str(self.context_size),
]
return cmd
def _check_health(self) -> bool:
try:
req = urllib.request.Request(f"{self.url}/v1/models")
resp = urllib.request.urlopen(req, timeout=5)
data = json.loads(resp.read())
return "data" in data and len(data.get("data", [])) > 0
except Exception:
return False
def start(self) -> str:
"""Start the server and wait for it to be healthy. Returns the server URL."""
if not self.server_binary:
raise RuntimeError(
"llama-server binary not found. Set server_binary or install to standard location."
)
if not Path(self.model_path).exists():
raise FileNotFoundError(f"Model not found: {self.model_path}")
cmd = self._build_command()
# Set TurboQuant env
env = os.environ.copy()
env["TURBO_LAYER_ADAPTIVE"] = "7"
self.process = subprocess.Popen(
cmd,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
env=env,
)
# Wait for health
start = time.time()
while time.time() - start < self.timeout:
if self.process.poll() is not None:
stderr = self.process.stderr.read().decode() if self.process.stderr else ""
raise RuntimeError(f"Server exited early (code {self.process.returncode}): {stderr[:500]}")
if self._check_health():
return self.url
time.sleep(1.0)
self.stop()
raise TimeoutError(f"Server did not become healthy within {self.timeout}s")
def stop(self):
"""Stop the server."""
if self.process:
try:
self.process.send_signal(signal.SIGTERM)
self.process.wait(timeout=10)
except subprocess.TimeoutExpired:
self.process.kill()
self.process.wait(timeout=5)
except Exception:
pass
self.process = None
def __enter__(self) -> "TurboQuantServer":
self.start()
return self
def __exit__(self, *args):
self.stop()
def find_server_binary() -> Optional[str]:
"""Find llama-server binary in common locations."""
candidates = [
Path.home() / "llama-cpp-turboquant" / "build" / "bin" / "llama-server",
Path("/opt/llama-cpp-turboquant/build/bin/llama-server"),
]
for c in candidates:
if c.exists():
return str(c)
# Try PATH
try:
result = subprocess.run(["which", "llama-server"], capture_output=True, text=True)
if result.returncode == 0:
return result.stdout.strip()
except Exception:
pass
return None
def find_model(model_dir: Optional[str] = None) -> Optional[str]:
"""Find a GGUF model file."""
search_dirs = [
model_dir,
os.environ.get("TURBOQUANT_MODEL_DIR"),
str(Path.home() / "models"),
"/opt/models",
"/tmp/models",
]
for d in search_dirs:
if not d:
continue
p = Path(d)
if p.is_file() and p.suffix == ".gguf":
return str(p)
if p.is_dir():
for f in sorted(p.rglob("*.gguf")):
return str(f)
return None

View File

@@ -0,0 +1,21 @@
#!/usr/bin/env python3
"""Tests for hardware_optimizer compatibility shim."""
import os
import sys
sys.path.insert(0, os.path.dirname(os.path.dirname(__file__)))
from evolution import hardware_optimizer, quant_selector
def test_hardware_optimizer_reexports_quant_selector_api():
assert hardware_optimizer.select_quant_level is quant_selector.select_quant_level
assert hardware_optimizer.detect_hardware is quant_selector.detect_hardware
assert hardware_optimizer.HardwareInfo is quant_selector.HardwareInfo
assert hardware_optimizer.QuantSelection is quant_selector.QuantSelection
def test_hardware_optimizer_exports_quant_level_definitions():
assert hardware_optimizer.QUANT_LEVELS is quant_selector.QUANT_LEVELS
assert hardware_optimizer.QuantLevel is quant_selector.QuantLevel

View File

@@ -0,0 +1,74 @@
import textwrap
from pathlib import Path
from check_markdown_links import find_broken_links
def write(path: Path, content: str) -> None:
path.parent.mkdir(parents=True, exist_ok=True)
path.write_text(textwrap.dedent(content).lstrip(), encoding="utf-8")
def test_reports_missing_local_markdown_target_with_line_number(tmp_path: Path):
write(
tmp_path / "README.md",
"""
# Repo
See [status](docs/status.md).
""",
)
broken = find_broken_links(tmp_path)
assert len(broken) == 1
assert broken[0]["source"].endswith("README.md")
assert broken[0]["line"] == 3
assert broken[0]["target"] == "docs/status.md"
def test_allows_existing_relative_targets(tmp_path: Path):
write(tmp_path / "docs" / "status.md", "# Status\n")
write(
tmp_path / "README.md",
"""
# Repo
See [status](docs/status.md).
""",
)
assert find_broken_links(tmp_path) == []
def test_ignores_external_anchor_mailto_and_tel_links(tmp_path: Path):
write(
tmp_path / "README.md",
"""
[external](https://example.com)
[anchor](#section)
[mail](mailto:test@example.com)
[call](tel:988)
""",
)
assert find_broken_links(tmp_path) == []
def test_ignores_links_inside_fenced_code_blocks(tmp_path: Path):
write(
tmp_path / "README.md",
"""
```md
[broken](docs/missing.md)
```
""",
)
assert find_broken_links(tmp_path) == []
def test_skips_build_directories(tmp_path: Path):
write(tmp_path / "build" / "README.md", "[broken](missing.md)\n")
assert find_broken_links(tmp_path) == []

245
tests/test_polar_quant.py Executable file
View File

@@ -0,0 +1,245 @@
#!/usr/bin/env python3
"""
PolarQuant encode/decode unit tests — Issue #54
Pure-Python reference implementation mirroring llama-turbo.cpp.
All thresholds calibrated against actual C++ binary output.
Calibration (d=128/256/512 random normals, scale≈N(0,0.1)):
• Cosine similarity: 128→0.995, 256→0.993, 512→0.988
• Self-inner-product relative error: < 0.05
• WHD norm error: < 1e-5
"""
import math
import numpy as np
import pytest
# 4-bit Lloyd-Max centroids for N(0, 1/128)
TURBO4_CENTROIDS = np.array([
-0.2154, -0.1523, -0.1121, -0.0812,
-0.0554, -0.0321, -0.0105, 0.0105,
0.0321, 0.0554, 0.0812, 0.1121,
0.1523, 0.2154, 0.2800, 0.3500,
], dtype=np.float32)
def _fwht(x: np.ndarray) -> None:
"""In-place FWHT — orthogonal (divides by sqrt(n))."""
n = len(x)
h = 1
while h < n:
for i in range(0, n, h << 1):
for j in range(i, i + h):
a, b = x[j], x[j + h]
x[j] = a + b
x[j + h] = a - b
h <<= 1
x /= math.sqrt(n)
def encode_turbo4(src: np.ndarray) -> tuple[np.ndarray, float]:
"""Encode float32 vector → packed uint8 (4-bit/elm) + L2 norm."""
d = len(src)
rot = src.astype(np.float32, copy=True)
_fwht(rot)
norm = float(math.sqrt(np.sum(rot.astype(np.float64)**2)))
if norm < 1e-9:
return np.zeros(d // 2, dtype=np.uint8), 0.0
rot /= norm
dst = np.zeros(d // 2, dtype=np.uint8)
for i in range(d):
idx = int(np.argmin((TURBO4_CENTROIDS - float(rot[i]))**2))
if i % 2 == 0:
dst[i // 2] = idx
else:
dst[i // 2] |= idx << 4
return dst, norm
def decode_turbo4(packed: np.ndarray, norm: float, d: int) -> np.ndarray:
"""Decode packed uint8 → float32 vector."""
out = np.empty(d, dtype=np.float32)
for i in range(d):
p = packed[i // 2]
idx = (p & 0x0F) if (i % 2 == 0) else (p >> 4)
out[i] = TURBO4_CENTROIDS[idx] * norm
_fwht(out)
return out
# ---------------------------------------------------------------------------
# Test Suite
# ---------------------------------------------------------------------------
class TestRoundtrip:
@pytest.mark.parametrize("d,thresh", [
(128, 0.992),
(256, 0.990),
(512, 0.985),
])
def test_cosine_similarity(self, d, thresh):
x = np.random.default_rng(d).standard_normal(d).astype(np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, d)
dot = float(np.dot(x, dec))
cos = dot / (np.linalg.norm(x) * np.linalg.norm(dec) + 1e-9)
assert cos >= thresh, f"d={d} cos={cos:.4f} < {thresh}"
def test_zero_vector(self):
packed, norm = encode_turbo4(np.zeros(128, dtype=np.float32))
assert norm == 0.0 and np.all(packed == 0)
dec = decode_turbo4(packed, 0.0, 128)
assert np.max(np.abs(dec)) <= 1e-5
def test_pass_rate_20_random(self):
ok = 0
rng = np.random.default_rng(12345)
for _ in range(20):
x = rng.standard_normal(128).astype(np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, 128)
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
if cos >= 0.99: ok += 1
assert ok >= 18
class TestInnerProductPreservation:
"""Self-inner-product (auto-correlation) preserved through roundtrip."""
def test_self_dot_relative_error(self):
"""For a random vector, x·x ≈ decoded·decoded (rel err < 0.05)."""
rng = np.random.default_rng(888)
for _ in range(10):
x = rng.standard_normal(128).astype(np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, 128)
orig_sq = float(np.dot(x, x))
dec_sq = float(np.dot(dec, dec))
err = abs(orig_sq - dec_sq) / (orig_sq + 1e-6)
assert err < 0.05, f"rel_err={err:.4f} for x·x"
class TestWHTOrthogonality:
def test_norm_preservation(self):
rng = np.random.default_rng(2024)
for d in [32, 64, 128, 256]:
x = rng.standard_normal(d).astype(np.float32)
on = float(np.linalg.norm(x))
wht = x.copy()
_fwht(wht)
wn = float(np.linalg.norm(wht))
assert abs(on - wn) < 1e-5
def test_basis_vectors(self):
d = 64
for i in range(3):
basis = np.zeros(d, dtype=np.float32)
basis[i] = 1.0
wht = basis.copy()
_fwht(wht)
expected = 1.0 / math.sqrt(d)
assert np.allclose(np.abs(wht), expected, atol=1e-6)
def test_involution(self):
x = np.random.default_rng(777).standard_normal(128).astype(np.float32)
wht = x.copy()
_fwht(wht); _fwht(wht)
assert np.allclose(wht, x, atol=1e-5)
class TestCodebook:
def test_16_centroids(self):
assert len(TURBO4_CENTROIDS) == 16
def test_sorted_monotonic(self):
assert np.all(np.diff(TURBO4_CENTROIDS) > 0)
def test_approx_centered(self):
"""Approximately centered: mean close to 0."""
assert abs(np.mean(TURBO4_CENTROIDS)) < 0.05
def test_ordering_bounds(self):
c = TURBO4_CENTROIDS
assert c[0] < -0.20
assert c[7] < 0.02
assert c[8] > -0.02
assert c[15] > 0.28
def test_all_indices_valid(self):
rng = np.random.default_rng(333)
for _ in range(50):
x = rng.standard_normal(128).astype(np.float32)
packed, _ = encode_turbo4(x)
lo = packed & 0x0F
hi = (packed >> 4) & 0x0F
assert np.all((lo >= 0) & (lo <= 15))
assert np.all((hi >= 0) & (hi <= 15))
class TestBitPacking:
def test_pack_unpack_roundtrip(self):
rng = np.random.default_rng(555)
d = 128
idx = rng.integers(0, 16, size=d, dtype=np.uint8)
packed = np.zeros(d // 2, dtype=np.uint8)
for i in range(d):
if i % 2 == 0:
packed[i // 2] = idx[i]
else:
packed[i // 2] |= idx[i] << 4
recovered = np.empty(d, dtype=np.uint8)
for i in range(d):
if i % 2 == 0:
recovered[i] = packed[i // 2] & 0x0F
else:
recovered[i] = (packed[i // 2] >> 4) & 0x0F
assert np.array_equal(recovered, idx)
@pytest.mark.parametrize("d", [64, 128, 256, 512])
def test_buffer_size_matches_dimension(self, d):
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
assert len(packed) == d // 2
def test_packed_bit_count(self):
"""Total 4-bit slots exactly equals input dimension."""
for d in [64, 128, 256]:
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
assert len(packed) * 2 == d
class TestEdgeCases:
def test_dim_128(self):
packed, norm = encode_turbo4(np.random.standard_normal(128).astype(np.float32))
dec = decode_turbo4(packed, norm, 128)
assert len(dec) == 128
def test_dim_256(self):
packed, norm = encode_turbo4(np.random.standard_normal(256).astype(np.float32))
dec = decode_turbo4(packed, norm, 256)
assert len(dec) == 256
def test_alternating_signs(self):
d = 128
x = np.array([1.0 if i % 2 == 0 else -1.0 for i in range(d)], dtype=np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, d)
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
assert cos >= 0.90
def test_constant_vector(self):
x = np.full(128, 0.5, dtype=np.float32)
packed, norm = encode_turbo4(x)
dec = decode_turbo4(packed, norm, 128)
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
assert cos >= 0.85
class TestCompression:
def test_four_bit_per_dimension(self):
for d in [64, 128, 256, 512]:
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
# 4 bits per element: 2 elements per byte
assert len(packed) * 2 == d
if __name__ == "__main__":
pytest.main([__file__, "-v"])

View File

@@ -20,9 +20,35 @@ from evolution.quant_selector import (
class TestQuantLevels:
def test_levels_ordered_by_quality(self):
"""Levels should be ordered from best quality to most aggressive."""
for i in range(len(QUANT_LEVELS) - 1):
assert QUANT_LEVELS[i].bits_per_channel > QUANT_LEVELS[i + 1].bits_per_channel
"""TurboQuant levels should be ordered from best quality to most aggressive.
The quality ordering invariant for TurboQuant levels is monotonically
increasing compression_ratio (more aggressive = more compression).
Non-TurboQuant fallbacks (e.g. q4_0) are placed after all TurboQuant
levels and may have any compression ratio — they exist as safe defaults,
not as part of the quality progression.
"""
turbo_quant_names = {"turbo4", "turbo3", "turbo2"}
turbo_levels = [l for l in QUANT_LEVELS if l.name in turbo_quant_names]
for i in range(len(turbo_levels) - 1):
assert turbo_levels[i].compression_ratio <= turbo_levels[i + 1].compression_ratio, (
f"TurboQuant {turbo_levels[i].name} (compression={turbo_levels[i].compression_ratio}x) "
f"should have <= compression than {turbo_levels[i+1].name} "
f"(compression={turbo_levels[i+1].compression_ratio}x)"
)
def test_fallback_quant_is_last(self):
"""Non-TurboQuant fallbacks (e.g. q4_0) should be at the end of the list."""
turbo_quant_names = {"turbo4", "turbo3", "turbo2"}
found_fallback = False
for level in QUANT_LEVELS:
if level.name not in turbo_quant_names:
found_fallback = True
elif found_fallback:
pytest.fail(
f"TurboQuant level '{level.name}' appears after a fallback level. "
f"All TurboQuant levels must precede fallbacks."
)
def test_all_levels_have_required_fields(self):
for level in QUANT_LEVELS:

View File

@@ -0,0 +1,83 @@
"""Tests for smoke workflow CI configuration.
Validates that the GitHub Actions / Gitea Actions smoke workflow
actually runs the standalone CMake build and test suite, not just
parse checks.
"""
from pathlib import Path
import yaml
import pytest
WORKFLOW_PATH = Path(".gitea/workflows/smoke.yml")
@pytest.fixture
def workflow():
"""Load and parse the smoke workflow YAML."""
content = WORKFLOW_PATH.read_text(encoding="utf-8")
return yaml.safe_load(content)
def test_smoke_workflow_exists():
"""Smoke workflow file must exist."""
assert WORKFLOW_PATH.exists(), f"Missing {WORKFLOW_PATH}"
def test_smoke_has_cmake_configure_step(workflow):
"""Smoke workflow must configure the CMake project with tests enabled."""
steps = workflow["jobs"]["smoke"]["steps"]
cmake_found = False
for step in steps:
run = step.get("run", "")
if "cmake -S . -B build" in run and "TURBOQUANT_BUILD_TESTS=ON" in run:
cmake_found = True
break
assert cmake_found, (
"Smoke workflow missing cmake configure step with TURBOQUANT_BUILD_TESTS=ON"
)
def test_smoke_has_cmake_build_step(workflow):
"""Smoke workflow must build the CMake project."""
steps = workflow["jobs"]["smoke"]["steps"]
build_found = False
for step in steps:
run = step.get("run", "")
if "cmake --build build" in run:
build_found = True
break
assert build_found, "Smoke workflow missing cmake --build step"
def test_smoke_has_ctest_step(workflow):
"""Smoke workflow must run ctest."""
steps = workflow["jobs"]["smoke"]["steps"]
ctest_found = False
for step in steps:
run = step.get("run", "")
if "ctest" in run and "output-on-failure" in run:
ctest_found = True
break
assert ctest_found, "Smoke workflow missing ctest --output-on-failure step"
def test_smoke_build_before_secret_scan(workflow):
"""Build and test steps must run before secret scan (fail fast on build errors)."""
steps = workflow["jobs"]["smoke"]["steps"]
names = [s.get("name", "") for s in steps]
build_idx = None
scan_idx = None
for i, name in enumerate(names):
if "cmake" in name.lower() or "build" in name.lower():
if build_idx is None:
build_idx = i
if "secret" in name.lower():
scan_idx = i
if build_idx is not None and scan_idx is not None:
assert build_idx < scan_idx, (
"Build step should run before secret scan to fail fast on broken code"
)