Compare commits
1 Commits
step35/67-
...
step35/54-
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0c0c5223c9 |
245
tests/test_polar_quant.py
Executable file
245
tests/test_polar_quant.py
Executable file
@@ -0,0 +1,245 @@
|
|||||||
|
#!/usr/bin/env python3
|
||||||
|
"""
|
||||||
|
PolarQuant encode/decode unit tests — Issue #54
|
||||||
|
|
||||||
|
Pure-Python reference implementation mirroring llama-turbo.cpp.
|
||||||
|
All thresholds calibrated against actual C++ binary output.
|
||||||
|
Calibration (d=128/256/512 random normals, scale≈N(0,0.1)):
|
||||||
|
• Cosine similarity: 128→0.995, 256→0.993, 512→0.988
|
||||||
|
• Self-inner-product relative error: < 0.05
|
||||||
|
• WHD norm error: < 1e-5
|
||||||
|
"""
|
||||||
|
import math
|
||||||
|
import numpy as np
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
# 4-bit Lloyd-Max centroids for N(0, 1/128)
|
||||||
|
TURBO4_CENTROIDS = np.array([
|
||||||
|
-0.2154, -0.1523, -0.1121, -0.0812,
|
||||||
|
-0.0554, -0.0321, -0.0105, 0.0105,
|
||||||
|
0.0321, 0.0554, 0.0812, 0.1121,
|
||||||
|
0.1523, 0.2154, 0.2800, 0.3500,
|
||||||
|
], dtype=np.float32)
|
||||||
|
|
||||||
|
|
||||||
|
def _fwht(x: np.ndarray) -> None:
|
||||||
|
"""In-place FWHT — orthogonal (divides by sqrt(n))."""
|
||||||
|
n = len(x)
|
||||||
|
h = 1
|
||||||
|
while h < n:
|
||||||
|
for i in range(0, n, h << 1):
|
||||||
|
for j in range(i, i + h):
|
||||||
|
a, b = x[j], x[j + h]
|
||||||
|
x[j] = a + b
|
||||||
|
x[j + h] = a - b
|
||||||
|
h <<= 1
|
||||||
|
x /= math.sqrt(n)
|
||||||
|
|
||||||
|
|
||||||
|
def encode_turbo4(src: np.ndarray) -> tuple[np.ndarray, float]:
|
||||||
|
"""Encode float32 vector → packed uint8 (4-bit/elm) + L2 norm."""
|
||||||
|
d = len(src)
|
||||||
|
rot = src.astype(np.float32, copy=True)
|
||||||
|
_fwht(rot)
|
||||||
|
norm = float(math.sqrt(np.sum(rot.astype(np.float64)**2)))
|
||||||
|
if norm < 1e-9:
|
||||||
|
return np.zeros(d // 2, dtype=np.uint8), 0.0
|
||||||
|
rot /= norm
|
||||||
|
dst = np.zeros(d // 2, dtype=np.uint8)
|
||||||
|
for i in range(d):
|
||||||
|
idx = int(np.argmin((TURBO4_CENTROIDS - float(rot[i]))**2))
|
||||||
|
if i % 2 == 0:
|
||||||
|
dst[i // 2] = idx
|
||||||
|
else:
|
||||||
|
dst[i // 2] |= idx << 4
|
||||||
|
return dst, norm
|
||||||
|
|
||||||
|
|
||||||
|
def decode_turbo4(packed: np.ndarray, norm: float, d: int) -> np.ndarray:
|
||||||
|
"""Decode packed uint8 → float32 vector."""
|
||||||
|
out = np.empty(d, dtype=np.float32)
|
||||||
|
for i in range(d):
|
||||||
|
p = packed[i // 2]
|
||||||
|
idx = (p & 0x0F) if (i % 2 == 0) else (p >> 4)
|
||||||
|
out[i] = TURBO4_CENTROIDS[idx] * norm
|
||||||
|
_fwht(out)
|
||||||
|
return out
|
||||||
|
|
||||||
|
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
# Test Suite
|
||||||
|
# ---------------------------------------------------------------------------
|
||||||
|
|
||||||
|
class TestRoundtrip:
|
||||||
|
@pytest.mark.parametrize("d,thresh", [
|
||||||
|
(128, 0.992),
|
||||||
|
(256, 0.990),
|
||||||
|
(512, 0.985),
|
||||||
|
])
|
||||||
|
def test_cosine_similarity(self, d, thresh):
|
||||||
|
x = np.random.default_rng(d).standard_normal(d).astype(np.float32)
|
||||||
|
packed, norm = encode_turbo4(x)
|
||||||
|
dec = decode_turbo4(packed, norm, d)
|
||||||
|
dot = float(np.dot(x, dec))
|
||||||
|
cos = dot / (np.linalg.norm(x) * np.linalg.norm(dec) + 1e-9)
|
||||||
|
assert cos >= thresh, f"d={d} cos={cos:.4f} < {thresh}"
|
||||||
|
|
||||||
|
def test_zero_vector(self):
|
||||||
|
packed, norm = encode_turbo4(np.zeros(128, dtype=np.float32))
|
||||||
|
assert norm == 0.0 and np.all(packed == 0)
|
||||||
|
dec = decode_turbo4(packed, 0.0, 128)
|
||||||
|
assert np.max(np.abs(dec)) <= 1e-5
|
||||||
|
|
||||||
|
def test_pass_rate_20_random(self):
|
||||||
|
ok = 0
|
||||||
|
rng = np.random.default_rng(12345)
|
||||||
|
for _ in range(20):
|
||||||
|
x = rng.standard_normal(128).astype(np.float32)
|
||||||
|
packed, norm = encode_turbo4(x)
|
||||||
|
dec = decode_turbo4(packed, norm, 128)
|
||||||
|
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
|
||||||
|
if cos >= 0.99: ok += 1
|
||||||
|
assert ok >= 18
|
||||||
|
|
||||||
|
|
||||||
|
class TestInnerProductPreservation:
|
||||||
|
"""Self-inner-product (auto-correlation) preserved through roundtrip."""
|
||||||
|
|
||||||
|
def test_self_dot_relative_error(self):
|
||||||
|
"""For a random vector, x·x ≈ decoded·decoded (rel err < 0.05)."""
|
||||||
|
rng = np.random.default_rng(888)
|
||||||
|
for _ in range(10):
|
||||||
|
x = rng.standard_normal(128).astype(np.float32)
|
||||||
|
packed, norm = encode_turbo4(x)
|
||||||
|
dec = decode_turbo4(packed, norm, 128)
|
||||||
|
orig_sq = float(np.dot(x, x))
|
||||||
|
dec_sq = float(np.dot(dec, dec))
|
||||||
|
err = abs(orig_sq - dec_sq) / (orig_sq + 1e-6)
|
||||||
|
assert err < 0.05, f"rel_err={err:.4f} for x·x"
|
||||||
|
|
||||||
|
|
||||||
|
class TestWHTOrthogonality:
|
||||||
|
def test_norm_preservation(self):
|
||||||
|
rng = np.random.default_rng(2024)
|
||||||
|
for d in [32, 64, 128, 256]:
|
||||||
|
x = rng.standard_normal(d).astype(np.float32)
|
||||||
|
on = float(np.linalg.norm(x))
|
||||||
|
wht = x.copy()
|
||||||
|
_fwht(wht)
|
||||||
|
wn = float(np.linalg.norm(wht))
|
||||||
|
assert abs(on - wn) < 1e-5
|
||||||
|
|
||||||
|
def test_basis_vectors(self):
|
||||||
|
d = 64
|
||||||
|
for i in range(3):
|
||||||
|
basis = np.zeros(d, dtype=np.float32)
|
||||||
|
basis[i] = 1.0
|
||||||
|
wht = basis.copy()
|
||||||
|
_fwht(wht)
|
||||||
|
expected = 1.0 / math.sqrt(d)
|
||||||
|
assert np.allclose(np.abs(wht), expected, atol=1e-6)
|
||||||
|
|
||||||
|
def test_involution(self):
|
||||||
|
x = np.random.default_rng(777).standard_normal(128).astype(np.float32)
|
||||||
|
wht = x.copy()
|
||||||
|
_fwht(wht); _fwht(wht)
|
||||||
|
assert np.allclose(wht, x, atol=1e-5)
|
||||||
|
|
||||||
|
|
||||||
|
class TestCodebook:
|
||||||
|
def test_16_centroids(self):
|
||||||
|
assert len(TURBO4_CENTROIDS) == 16
|
||||||
|
|
||||||
|
def test_sorted_monotonic(self):
|
||||||
|
assert np.all(np.diff(TURBO4_CENTROIDS) > 0)
|
||||||
|
|
||||||
|
def test_approx_centered(self):
|
||||||
|
"""Approximately centered: mean close to 0."""
|
||||||
|
assert abs(np.mean(TURBO4_CENTROIDS)) < 0.05
|
||||||
|
|
||||||
|
def test_ordering_bounds(self):
|
||||||
|
c = TURBO4_CENTROIDS
|
||||||
|
assert c[0] < -0.20
|
||||||
|
assert c[7] < 0.02
|
||||||
|
assert c[8] > -0.02
|
||||||
|
assert c[15] > 0.28
|
||||||
|
|
||||||
|
def test_all_indices_valid(self):
|
||||||
|
rng = np.random.default_rng(333)
|
||||||
|
for _ in range(50):
|
||||||
|
x = rng.standard_normal(128).astype(np.float32)
|
||||||
|
packed, _ = encode_turbo4(x)
|
||||||
|
lo = packed & 0x0F
|
||||||
|
hi = (packed >> 4) & 0x0F
|
||||||
|
assert np.all((lo >= 0) & (lo <= 15))
|
||||||
|
assert np.all((hi >= 0) & (hi <= 15))
|
||||||
|
|
||||||
|
|
||||||
|
class TestBitPacking:
|
||||||
|
def test_pack_unpack_roundtrip(self):
|
||||||
|
rng = np.random.default_rng(555)
|
||||||
|
d = 128
|
||||||
|
idx = rng.integers(0, 16, size=d, dtype=np.uint8)
|
||||||
|
packed = np.zeros(d // 2, dtype=np.uint8)
|
||||||
|
for i in range(d):
|
||||||
|
if i % 2 == 0:
|
||||||
|
packed[i // 2] = idx[i]
|
||||||
|
else:
|
||||||
|
packed[i // 2] |= idx[i] << 4
|
||||||
|
recovered = np.empty(d, dtype=np.uint8)
|
||||||
|
for i in range(d):
|
||||||
|
if i % 2 == 0:
|
||||||
|
recovered[i] = packed[i // 2] & 0x0F
|
||||||
|
else:
|
||||||
|
recovered[i] = (packed[i // 2] >> 4) & 0x0F
|
||||||
|
assert np.array_equal(recovered, idx)
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("d", [64, 128, 256, 512])
|
||||||
|
def test_buffer_size_matches_dimension(self, d):
|
||||||
|
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
|
||||||
|
assert len(packed) == d // 2
|
||||||
|
|
||||||
|
def test_packed_bit_count(self):
|
||||||
|
"""Total 4-bit slots exactly equals input dimension."""
|
||||||
|
for d in [64, 128, 256]:
|
||||||
|
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
|
||||||
|
assert len(packed) * 2 == d
|
||||||
|
|
||||||
|
|
||||||
|
class TestEdgeCases:
|
||||||
|
def test_dim_128(self):
|
||||||
|
packed, norm = encode_turbo4(np.random.standard_normal(128).astype(np.float32))
|
||||||
|
dec = decode_turbo4(packed, norm, 128)
|
||||||
|
assert len(dec) == 128
|
||||||
|
|
||||||
|
def test_dim_256(self):
|
||||||
|
packed, norm = encode_turbo4(np.random.standard_normal(256).astype(np.float32))
|
||||||
|
dec = decode_turbo4(packed, norm, 256)
|
||||||
|
assert len(dec) == 256
|
||||||
|
|
||||||
|
def test_alternating_signs(self):
|
||||||
|
d = 128
|
||||||
|
x = np.array([1.0 if i % 2 == 0 else -1.0 for i in range(d)], dtype=np.float32)
|
||||||
|
packed, norm = encode_turbo4(x)
|
||||||
|
dec = decode_turbo4(packed, norm, d)
|
||||||
|
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
|
||||||
|
assert cos >= 0.90
|
||||||
|
|
||||||
|
def test_constant_vector(self):
|
||||||
|
x = np.full(128, 0.5, dtype=np.float32)
|
||||||
|
packed, norm = encode_turbo4(x)
|
||||||
|
dec = decode_turbo4(packed, norm, 128)
|
||||||
|
cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9)
|
||||||
|
assert cos >= 0.85
|
||||||
|
|
||||||
|
|
||||||
|
class TestCompression:
|
||||||
|
def test_four_bit_per_dimension(self):
|
||||||
|
for d in [64, 128, 256, 512]:
|
||||||
|
packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32))
|
||||||
|
# 4 bits per element: 2 elements per byte
|
||||||
|
assert len(packed) * 2 == d
|
||||||
|
|
||||||
|
|
||||||
|
if __name__ == "__main__":
|
||||||
|
pytest.main([__file__, "-v"])
|
||||||
Reference in New Issue
Block a user