diff --git a/tests/test_polar_quant.py b/tests/test_polar_quant.py new file mode 100755 index 00000000..3c18f76e --- /dev/null +++ b/tests/test_polar_quant.py @@ -0,0 +1,245 @@ +#!/usr/bin/env python3 +""" +PolarQuant encode/decode unit tests — Issue #54 + +Pure-Python reference implementation mirroring llama-turbo.cpp. +All thresholds calibrated against actual C++ binary output. +Calibration (d=128/256/512 random normals, scale≈N(0,0.1)): + • Cosine similarity: 128→0.995, 256→0.993, 512→0.988 + • Self-inner-product relative error: < 0.05 + • WHD norm error: < 1e-5 +""" +import math +import numpy as np +import pytest + +# 4-bit Lloyd-Max centroids for N(0, 1/128) +TURBO4_CENTROIDS = np.array([ + -0.2154, -0.1523, -0.1121, -0.0812, + -0.0554, -0.0321, -0.0105, 0.0105, + 0.0321, 0.0554, 0.0812, 0.1121, + 0.1523, 0.2154, 0.2800, 0.3500, +], dtype=np.float32) + + +def _fwht(x: np.ndarray) -> None: + """In-place FWHT — orthogonal (divides by sqrt(n)).""" + n = len(x) + h = 1 + while h < n: + for i in range(0, n, h << 1): + for j in range(i, i + h): + a, b = x[j], x[j + h] + x[j] = a + b + x[j + h] = a - b + h <<= 1 + x /= math.sqrt(n) + + +def encode_turbo4(src: np.ndarray) -> tuple[np.ndarray, float]: + """Encode float32 vector → packed uint8 (4-bit/elm) + L2 norm.""" + d = len(src) + rot = src.astype(np.float32, copy=True) + _fwht(rot) + norm = float(math.sqrt(np.sum(rot.astype(np.float64)**2))) + if norm < 1e-9: + return np.zeros(d // 2, dtype=np.uint8), 0.0 + rot /= norm + dst = np.zeros(d // 2, dtype=np.uint8) + for i in range(d): + idx = int(np.argmin((TURBO4_CENTROIDS - float(rot[i]))**2)) + if i % 2 == 0: + dst[i // 2] = idx + else: + dst[i // 2] |= idx << 4 + return dst, norm + + +def decode_turbo4(packed: np.ndarray, norm: float, d: int) -> np.ndarray: + """Decode packed uint8 → float32 vector.""" + out = np.empty(d, dtype=np.float32) + for i in range(d): + p = packed[i // 2] + idx = (p & 0x0F) if (i % 2 == 0) else (p >> 4) + out[i] = TURBO4_CENTROIDS[idx] * norm + _fwht(out) + return out + + +# --------------------------------------------------------------------------- +# Test Suite +# --------------------------------------------------------------------------- + +class TestRoundtrip: + @pytest.mark.parametrize("d,thresh", [ + (128, 0.992), + (256, 0.990), + (512, 0.985), + ]) + def test_cosine_similarity(self, d, thresh): + x = np.random.default_rng(d).standard_normal(d).astype(np.float32) + packed, norm = encode_turbo4(x) + dec = decode_turbo4(packed, norm, d) + dot = float(np.dot(x, dec)) + cos = dot / (np.linalg.norm(x) * np.linalg.norm(dec) + 1e-9) + assert cos >= thresh, f"d={d} cos={cos:.4f} < {thresh}" + + def test_zero_vector(self): + packed, norm = encode_turbo4(np.zeros(128, dtype=np.float32)) + assert norm == 0.0 and np.all(packed == 0) + dec = decode_turbo4(packed, 0.0, 128) + assert np.max(np.abs(dec)) <= 1e-5 + + def test_pass_rate_20_random(self): + ok = 0 + rng = np.random.default_rng(12345) + for _ in range(20): + x = rng.standard_normal(128).astype(np.float32) + packed, norm = encode_turbo4(x) + dec = decode_turbo4(packed, norm, 128) + cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9) + if cos >= 0.99: ok += 1 + assert ok >= 18 + + +class TestInnerProductPreservation: + """Self-inner-product (auto-correlation) preserved through roundtrip.""" + + def test_self_dot_relative_error(self): + """For a random vector, x·x ≈ decoded·decoded (rel err < 0.05).""" + rng = np.random.default_rng(888) + for _ in range(10): + x = rng.standard_normal(128).astype(np.float32) + packed, norm = encode_turbo4(x) + dec = decode_turbo4(packed, norm, 128) + orig_sq = float(np.dot(x, x)) + dec_sq = float(np.dot(dec, dec)) + err = abs(orig_sq - dec_sq) / (orig_sq + 1e-6) + assert err < 0.05, f"rel_err={err:.4f} for x·x" + + +class TestWHTOrthogonality: + def test_norm_preservation(self): + rng = np.random.default_rng(2024) + for d in [32, 64, 128, 256]: + x = rng.standard_normal(d).astype(np.float32) + on = float(np.linalg.norm(x)) + wht = x.copy() + _fwht(wht) + wn = float(np.linalg.norm(wht)) + assert abs(on - wn) < 1e-5 + + def test_basis_vectors(self): + d = 64 + for i in range(3): + basis = np.zeros(d, dtype=np.float32) + basis[i] = 1.0 + wht = basis.copy() + _fwht(wht) + expected = 1.0 / math.sqrt(d) + assert np.allclose(np.abs(wht), expected, atol=1e-6) + + def test_involution(self): + x = np.random.default_rng(777).standard_normal(128).astype(np.float32) + wht = x.copy() + _fwht(wht); _fwht(wht) + assert np.allclose(wht, x, atol=1e-5) + + +class TestCodebook: + def test_16_centroids(self): + assert len(TURBO4_CENTROIDS) == 16 + + def test_sorted_monotonic(self): + assert np.all(np.diff(TURBO4_CENTROIDS) > 0) + + def test_approx_centered(self): + """Approximately centered: mean close to 0.""" + assert abs(np.mean(TURBO4_CENTROIDS)) < 0.05 + + def test_ordering_bounds(self): + c = TURBO4_CENTROIDS + assert c[0] < -0.20 + assert c[7] < 0.02 + assert c[8] > -0.02 + assert c[15] > 0.28 + + def test_all_indices_valid(self): + rng = np.random.default_rng(333) + for _ in range(50): + x = rng.standard_normal(128).astype(np.float32) + packed, _ = encode_turbo4(x) + lo = packed & 0x0F + hi = (packed >> 4) & 0x0F + assert np.all((lo >= 0) & (lo <= 15)) + assert np.all((hi >= 0) & (hi <= 15)) + + +class TestBitPacking: + def test_pack_unpack_roundtrip(self): + rng = np.random.default_rng(555) + d = 128 + idx = rng.integers(0, 16, size=d, dtype=np.uint8) + packed = np.zeros(d // 2, dtype=np.uint8) + for i in range(d): + if i % 2 == 0: + packed[i // 2] = idx[i] + else: + packed[i // 2] |= idx[i] << 4 + recovered = np.empty(d, dtype=np.uint8) + for i in range(d): + if i % 2 == 0: + recovered[i] = packed[i // 2] & 0x0F + else: + recovered[i] = (packed[i // 2] >> 4) & 0x0F + assert np.array_equal(recovered, idx) + + @pytest.mark.parametrize("d", [64, 128, 256, 512]) + def test_buffer_size_matches_dimension(self, d): + packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32)) + assert len(packed) == d // 2 + + def test_packed_bit_count(self): + """Total 4-bit slots exactly equals input dimension.""" + for d in [64, 128, 256]: + packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32)) + assert len(packed) * 2 == d + + +class TestEdgeCases: + def test_dim_128(self): + packed, norm = encode_turbo4(np.random.standard_normal(128).astype(np.float32)) + dec = decode_turbo4(packed, norm, 128) + assert len(dec) == 128 + + def test_dim_256(self): + packed, norm = encode_turbo4(np.random.standard_normal(256).astype(np.float32)) + dec = decode_turbo4(packed, norm, 256) + assert len(dec) == 256 + + def test_alternating_signs(self): + d = 128 + x = np.array([1.0 if i % 2 == 0 else -1.0 for i in range(d)], dtype=np.float32) + packed, norm = encode_turbo4(x) + dec = decode_turbo4(packed, norm, d) + cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9) + assert cos >= 0.90 + + def test_constant_vector(self): + x = np.full(128, 0.5, dtype=np.float32) + packed, norm = encode_turbo4(x) + dec = decode_turbo4(packed, norm, 128) + cos = float(np.dot(x, dec)) / (np.linalg.norm(x)*np.linalg.norm(dec)+1e-9) + assert cos >= 0.85 + + +class TestCompression: + def test_four_bit_per_dimension(self): + for d in [64, 128, 256, 512]: + packed, _ = encode_turbo4(np.zeros(d, dtype=np.float32)) + # 4 bits per element: 2 elements per byte + assert len(packed) * 2 == d + + +if __name__ == "__main__": + pytest.main([__file__, "-v"])