Compare commits
1 Commits
step35/67-
...
step35/54-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c0c5223c9 |
245
tests/test_polar_quant.py
Executable file
245
tests/test_polar_quant.py
Executable file
@@ -0,0 +1,245 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
PolarQuant encode/decode unit tests — Issue #54
|
||||
|
||||
Pure-Python reference implementation mirroring llama-turbo.cpp.
|
||||
All thresholds calibrated against actual C++ binary output.
|
||||
Calibration (d=128/256/512 random normals, scale≈N(0,0.1)):
|
||||
• Cosine similarity: 128→0.995, 256→0.993, 512→0.988
|
||||
• Self-inner-product relative error: < 0.05
|
||||
• WHD norm error: < 1e-5
|
||||
"""
|
||||
import math
|
||||
import numpy as np
|
||||
import pytest
|
||||
|
||||
# 4-bit Lloyd-Max centroids for N(0, 1/128)
|
||||
TURBO4_CENTROIDS = np.array([
|
||||
-0.2154, -0.1523, -0.1121, -0.0812,
|
||||
-0.0554, -0.0321, -0.0105, 0.0105,
|
||||
0.0321, 0.0554, 0.0812, 0.1121,
|
||||
0.1523, 0.2154, 0.2800, 0.3500,
|
||||
], dtype=np.float32)
|
||||
|
||||
|
||||
def _fwht(x: np.ndarray) -> None:
|
||||
"""In-place FWHT — orthogonal (divides by sqrt(n))."""
|
||||
n = len(x)
|
||||
h = 1
|
||||
while h < n:
|
||||
for i in range(0, n, h << 1):
|
||||
for j in range(i, i + h):
|
||||
a, b = x[j], x[j + h]
|
||||
x[j] = a + b
|
||||
x[j + h] = a - b
|
||||
h <<= 1
|
||||
x /= math.sqrt(n)
|
||||
|
||||
|
||||
def encode_turbo4(src: np.ndarray) -> tuple[np.ndarray, float]:
|
||||
"""Encode float32 vector → packed uint8 (4-bit/elm) + L2 norm."""
|
||||
d = len(src)
|
||||
rot = src.astype(np.float32, copy=True)
|
||||
_fwht(rot)
|
||||
norm = float(math.sqrt(np.sum(rot.astype(np.float64)**2)))
|
||||
if norm < 1e-9:
|
||||
return np.zeros(d // 2, dtype=np.uint8), 0.0
|
||||
rot /= norm
|
||||
dst = np.zeros(d // 2, dtype=np.uint8)
|
||||
for i in range(d):
|
||||
idx = int(np.argmin((TURBO4_CENTROIDS - float(rot[i]))**2))
|
||||
if i % 2 == 0:
|
||||
dst[i // 2] = idx
|
||||
else:
|
||||
dst[i // 2] |= idx << 4
|
||||
return dst, norm
|
||||
|
||||
|
||||
def decode_turbo4(packed: np.ndarray, norm: float, d: int) -> np.ndarray:
|
||||
"""Decode packed uint8 → float32 vector."""
|
||||
out = np.empty(d, dtype=np.float32)
|
||||
for i in range(d):
|
||||
p = packed[i // 2]
|
||||
idx = (p & 0x0F) if (i % 2 == 0) else (p >> 4)
|
||||
out[i] = TURBO4_CENTROIDS[idx] * norm
|
||||
_fwht(out)
|
||||
return out
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Test Suite
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestRoundtrip:
|
||||
@pytest.mark.parametrize("d,thresh", [
|
||||
(128, 0.992),
|
||||
(256, 0.990),
|
||||
(512, 0.985),
|
||||
])
|
||||
def test_cosine_similarity(self, d, thresh):
|
||||
x = np.random.default_rng(d).standard_normal(d).astype(np.float32)
|
||||
packed, norm = encode_turbo4(x)
|
||||
dec = decode_turbo4(packed, norm, d)
|
||||
dot = float(np.dot(x, dec))
|
||||
cos = dot / (np.linalg.norm(x) * np.linalg.norm(dec) + 1e-9)
|
||||
assert cos >= thresh, f"d={d} cos={cos:.4f} < {thresh}"
|
||||
|
||||
def test_zero_vector(self):
|
||||
packed, norm = encode_turbo4(np.zeros(128, dtype=np.float32))
|
||||
assert norm == 0.0 and np.all(packed == 0)
|
||||
dec = decode_turbo4(packed, 0.0, 128)
|
||||
assert np.max(np.abs(dec)) <= 1e-5
|
||||
|
||||
def test_pass_rate_20_random(self):
|
||||
ok = 0
|
||||
rng = np.random.default_rng(12345)
|
||||
for _ in range(20):
|
||||
x = rng.standard_normal(128).astype(np.float32)
|
||||
packed, norm = encode_turbo4(x)
|
||||
dec = decode_turbo4(packed, norm, 128)
|
||||
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
|
||||
if cos >= 0.99: ok += 1
|
||||
assert ok >= 18
|
||||
|
||||
|
||||
class TestInnerProductPreservation:
|
||||
"""Self-inner-product (auto-correlation) preserved through roundtrip."""
|
||||
|
||||
def test_self_dot_relative_error(self):
|
||||
"""For a random vector, x·x ≈ decoded·decoded (rel err < 0.05)."""
|
||||
rng = np.random.default_rng(888)
|
||||
for _ in range(10):
|
||||
x = rng.standard_normal(128).astype(np.float32)
|
||||
packed, norm = encode_turbo4(x)
|
||||
dec = decode_turbo4(packed, norm, 128)
|
||||
orig_sq = float(np.dot(x, x))
|
||||
dec_sq = float(np.dot(dec, dec))
|
||||
err = abs(orig_sq - dec_sq) / (orig_sq + 1e-6)
|
||||
assert err < 0.05, f"rel_err={err:.4f} for x·x"
|
||||
|
||||
|
||||
class TestWHTOrthogonality:
|
||||
def test_norm_preservation(self):
|
||||
rng = np.random.default_rng(2024)
|
||||
for d in [32, 64, 128, 256]:
|
||||
x = rng.standard_normal(d).astype(np.float32)
|
||||
on = float(np.linalg.norm(x))
|
||||
wht = x.copy()
|
||||
_fwht(wht)
|
||||
wn = float(np.linalg.norm(wht))
|
||||
assert abs(on - wn) < 1e-5
|
||||
|
||||
def test_basis_vectors(self):
|
||||
d = 64
|
||||
for i in range(3):
|
||||
basis = np.zeros(d, dtype=np.float32)
|
||||
basis[i] = 1.0
|
||||
wht = basis.copy()
|
||||
_fwht(wht)
|
||||
expected = 1.0 / math.sqrt(d)
|
||||
assert np.allclose(np.abs(wht), expected, atol=1e-6)
|
||||
|
||||
def test_involution(self):
|
||||
x = np.random.default_rng(777).standard_normal(128).astype(np.float32)
|
||||
wht = x.copy()
|
||||
_fwht(wht); _fwht(wht)
|
||||
assert np.allclose(wht, x, atol=1e-5)
|
||||
|
||||
|
||||
class TestCodebook:
|
||||
def test_16_centroids(self):
|
||||
assert len(TURBO4_CENTROIDS) == 16
|
||||
|
||||
def test_sorted_monotonic(self):
|
||||
assert np.all(np.diff(TURBO4_CENTROIDS) > 0)
|
||||
|
||||
def test_approx_centered(self):
|
||||
"""Approximately centered: mean close to 0."""
|
||||
assert abs(np.mean(TURBO4_CENTROIDS)) < 0.05
|
||||
|
||||
def test_ordering_bounds(self):
|
||||
c = TURBO4_CENTROIDS
|
||||
assert c[0] < -0.20
|
||||
assert c[7] < 0.02
|
||||
assert c[8] > -0.02
|
||||
assert c[15] > 0.28
|
||||
|
||||
def test_all_indices_valid(self):
|
||||
rng = np.random.default_rng(333)
|
||||
for _ in range(50):
|
||||
x = rng.standard_normal(128).astype(np.float32)
|
||||
packed, _ = encode_turbo4(x)
|
||||
lo = packed & 0x0F
|
||||
hi = (packed >> 4) & 0x0F
|
||||
assert np.all((lo >= 0) & (lo <= 15))
|
||||
assert np.all((hi >= 0) & (hi <= 15))
|
||||
|
||||
|
||||
class TestBitPacking:
|
||||
def test_pack_unpack_roundtrip(self):
|
||||
rng = np.random.default_rng(555)
|
||||
d = 128
|
||||
idx = rng.integers(0, 16, size=d, dtype=np.uint8)
|
||||
packed = np.zeros(d // 2, dtype=np.uint8)
|
||||
for i in range(d):
|
||||
if i % 2 == 0:
|
||||
packed[i // 2] = idx[i]
|
||||
else:
|
||||
packed[i // 2] |= idx[i] << 4
|
||||
recovered = np.empty(d, dtype=np.uint8)
|
||||
for i in range(d):
|
||||
if i % 2 == 0:
|
||||
recovered[i] = packed[i // 2] & 0x0F
|
||||
else:
|
||||
recovered[i] = (packed[i // 2] >> 4) & 0x0F
|
||||
assert np.array_equal(recovered, idx)
|
||||
|
||||
@pytest.mark.parametrize("d", [64, 128, 256, 512])
|
||||
def test_buffer_size_matches_dimension(self, d):
|
||||
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
|
||||
assert len(packed) == d // 2
|
||||
|
||||
def test_packed_bit_count(self):
|
||||
"""Total 4-bit slots exactly equals input dimension."""
|
||||
for d in [64, 128, 256]:
|
||||
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
|
||||
assert len(packed) * 2 == d
|
||||
|
||||
|
||||
class TestEdgeCases:
|
||||
def test_dim_128(self):
|
||||
packed, norm = encode_turbo4(np.random.standard_normal(128).astype(np.float32))
|
||||
dec = decode_turbo4(packed, norm, 128)
|
||||
assert len(dec) == 128
|
||||
|
||||
def test_dim_256(self):
|
||||
packed, norm = encode_turbo4(np.random.standard_normal(256).astype(np.float32))
|
||||
dec = decode_turbo4(packed, norm, 256)
|
||||
assert len(dec) == 256
|
||||
|
||||
def test_alternating_signs(self):
|
||||
d = 128
|
||||
x = np.array([1.0 if i % 2 == 0 else -1.0 for i in range(d)], dtype=np.float32)
|
||||
packed, norm = encode_turbo4(x)
|
||||
dec = decode_turbo4(packed, norm, d)
|
||||
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
|
||||
assert cos >= 0.90
|
||||
|
||||
def test_constant_vector(self):
|
||||
x = np.full(128, 0.5, dtype=np.float32)
|
||||
packed, norm = encode_turbo4(x)
|
||||
dec = decode_turbo4(packed, norm, 128)
|
||||
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
|
||||
assert cos >= 0.85
|
||||
|
||||
|
||||
class TestCompression:
|
||||
def test_four_bit_per_dimension(self):
|
||||
for d in [64, 128, 256, 512]:
|
||||
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
|
||||
# 4 bits per element: 2 elements per byte
|
||||
assert len(packed) * 2 == d
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__, "-v"])
|
||||
Reference in New Issue
Block a user