[kimi] Add comprehensive unit tests for models/budget.py (#1316) (#1347)

This commit is contained in:
2026-03-24 02:48:51 +00:00
parent 3349948f7f
commit e09082a8a8

View File

@@ -0,0 +1,598 @@
"""Unit tests for models/budget.py — comprehensive coverage for budget management.
Tests budget allocation, tracking, limit enforcement, and edge cases including:
- Zero budget scenarios
- Over-budget handling
- Budget reset behavior
- In-memory fallback when DB is unavailable
"""
import threading
import time
from datetime import UTC, date, datetime, timedelta
from unittest.mock import patch
import pytest
from infrastructure.models.budget import (
BudgetTracker,
SpendRecord,
estimate_cost_usd,
get_budget_tracker,
)
pytestmark = pytest.mark.unit
# ── Test SpendRecord dataclass ────────────────────────────────────────────────
class TestSpendRecord:
"""Tests for the SpendRecord dataclass."""
def test_spend_record_creation(self):
"""Test creating a SpendRecord with all fields."""
ts = time.time()
record = SpendRecord(
ts=ts,
provider="anthropic",
model="claude-haiku-4-5",
tokens_in=100,
tokens_out=200,
cost_usd=0.001,
tier="cloud",
)
assert record.ts == ts
assert record.provider == "anthropic"
assert record.model == "claude-haiku-4-5"
assert record.tokens_in == 100
assert record.tokens_out == 200
assert record.cost_usd == 0.001
assert record.tier == "cloud"
def test_spend_record_with_zero_tokens(self):
"""Test SpendRecord with zero tokens."""
ts = time.time()
record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud")
assert record.tokens_in == 0
assert record.tokens_out == 0
# ── Test estimate_cost_usd function ───────────────────────────────────────────
class TestEstimateCostUsd:
"""Tests for the estimate_cost_usd function."""
def test_haiku_cheaper_than_sonnet(self):
"""Haiku should be cheaper than Sonnet for same tokens."""
haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
assert haiku_cost < sonnet_cost
def test_zero_tokens_is_zero_cost(self):
"""Zero tokens should result in zero cost."""
assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0
def test_only_input_tokens(self):
"""Cost calculation with only input tokens."""
cost = estimate_cost_usd("gpt-4o", 1000, 0)
expected = (1000 * 0.0025) / 1000.0 # $0.0025 per 1K input tokens
assert cost == pytest.approx(expected)
def test_only_output_tokens(self):
"""Cost calculation with only output tokens."""
cost = estimate_cost_usd("gpt-4o", 0, 1000)
expected = (1000 * 0.01) / 1000.0 # $0.01 per 1K output tokens
assert cost == pytest.approx(expected)
def test_unknown_model_uses_default(self):
"""Unknown model should use conservative default cost."""
cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000)
assert cost > 0 # Uses conservative default, not zero
# Default is 0.003 input, 0.015 output per 1K
expected = (1000 * 0.003 + 1000 * 0.015) / 1000.0
assert cost == pytest.approx(expected)
def test_versioned_model_name_matches(self):
"""Versioned model names should match base model rates."""
cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0)
cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
assert cost1 == cost2
def test_gpt4o_mini_cheaper_than_gpt4o(self):
"""GPT-4o mini should be cheaper than GPT-4o."""
mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000)
full = estimate_cost_usd("gpt-4o", 1000, 1000)
assert mini < full
def test_opus_most_expensive_claude(self):
"""Opus should be the most expensive Claude model."""
opus = estimate_cost_usd("claude-opus-4-5", 1000, 1000)
sonnet = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000)
haiku = estimate_cost_usd("claude-haiku-4-5", 1000, 1000)
assert opus > sonnet > haiku
def test_grok_variants(self):
"""Test Grok model cost estimation."""
cost = estimate_cost_usd("grok-3", 1000, 1000)
assert cost > 0
cost_fast = estimate_cost_usd("grok-3-fast", 1000, 1000)
assert cost_fast > 0
def test_case_insensitive_matching(self):
"""Model name matching should be case insensitive."""
cost_lower = estimate_cost_usd("claude-haiku-4-5", 1000, 0)
cost_upper = estimate_cost_usd("CLAUDE-HAIKU-4-5", 1000, 0)
cost_mixed = estimate_cost_usd("Claude-Haiku-4-5", 1000, 0)
assert cost_lower == cost_upper == cost_mixed
def test_returns_float(self):
"""Function should always return a float."""
assert isinstance(estimate_cost_usd("haiku", 100, 200), float)
assert isinstance(estimate_cost_usd("unknown-model", 100, 200), float)
assert isinstance(estimate_cost_usd("haiku", 0, 0), float)
# ── Test BudgetTracker initialization ─────────────────────────────────────────
class TestBudgetTrackerInit:
"""Tests for BudgetTracker initialization."""
def test_creates_with_memory_db(self):
"""Tracker should initialize with in-memory database."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._db_ok is True
def test_in_memory_fallback_empty_on_creation(self):
"""In-memory fallback should start empty."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker._in_memory == []
def test_custom_db_path(self, tmp_path):
"""Tracker should use custom database path."""
db_file = tmp_path / "custom_budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert tracker._db_path == str(db_file)
assert db_file.exists()
def test_db_path_directory_creation(self, tmp_path):
"""Tracker should create parent directories if needed."""
db_file = tmp_path / "nested" / "dirs" / "budget.db"
tracker = BudgetTracker(db_path=str(db_file))
assert tracker._db_ok is True
assert db_file.parent.exists()
def test_invalid_db_path_fallback(self):
"""Tracker should fallback to in-memory on invalid path."""
# Use a path that cannot be created (e.g., permission denied simulation)
tracker = BudgetTracker.__new__(BudgetTracker)
tracker._db_path = "/nonexistent/invalid/path/budget.db"
tracker._lock = threading.Lock()
tracker._in_memory = []
tracker._db_ok = False
# Should still work with in-memory fallback
cost = tracker.record_spend("test", "model", cost_usd=0.01)
assert cost == 0.01
# ── Test BudgetTracker record_spend ───────────────────────────────────────────
class TestBudgetTrackerRecordSpend:
"""Tests for recording spend events."""
def test_record_spend_returns_cost(self):
"""record_spend should return the calculated cost."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200)
assert cost > 0
def test_record_spend_explicit_cost(self):
"""record_spend should use explicit cost when provided."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "model", cost_usd=1.23)
assert cost == pytest.approx(1.23)
def test_record_spend_accumulates(self):
"""Multiple spend records should accumulate correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("openai", "gpt-4o", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_spend_with_tier_label(self):
"""record_spend should accept custom tier labels."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api")
assert cost >= 0
def test_record_spend_with_provider(self):
"""record_spend should track provider correctly."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=0.01)
tracker.record_spend("anthropic", "claude-haiku", cost_usd=0.02)
assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9)
def test_record_zero_cost(self):
"""Recording zero cost should work correctly."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=0.0)
assert cost == 0.0
assert tracker.get_daily_spend() == 0.0
def test_record_negative_cost(self):
"""Recording negative cost (refund) should work."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("test", "model", cost_usd=-0.50)
assert cost == -0.50
assert tracker.get_daily_spend() == -0.50
# ── Test BudgetTracker daily/monthly spend queries ────────────────────────────
class TestBudgetTrackerSpendQueries:
"""Tests for daily and monthly spend queries."""
def test_monthly_spend_includes_daily(self):
"""Monthly spend should be >= daily spend."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=5.00)
assert tracker.get_monthly_spend() >= tracker.get_daily_spend()
def test_get_daily_spend_empty(self):
"""Daily spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_daily_spend() == 0.0
def test_get_monthly_spend_empty(self):
"""Monthly spend should be zero when no records."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.get_monthly_spend() == 0.0
def test_daily_spend_isolation(self):
"""Daily spend should only include today's records, not old ones."""
tracker = BudgetTracker(db_path=":memory:")
# Force use of in-memory fallback
tracker._db_ok = False
# Add record for today
today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp()
tracker._in_memory.append(
SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud")
)
# Add old record (2 days ago)
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud")
)
# Daily should only include today's 1.0
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
# Monthly should include both (both are in current month)
assert tracker.get_monthly_spend() == pytest.approx(3.0, abs=1e-9)
# ── Test BudgetTracker cloud_allowed ──────────────────────────────────────────
class TestBudgetTrackerCloudAllowed:
"""Tests for cloud budget limit enforcement."""
def test_allowed_when_no_spend(self):
"""Cloud should be allowed when no spend recorded."""
tracker = BudgetTracker(db_path=":memory:")
assert tracker.cloud_allowed() is True
def test_blocked_when_daily_limit_exceeded(self):
"""Cloud should be blocked when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
# With default daily limit of 5.0, 999 should block
assert tracker.cloud_allowed() is False
def test_allowed_when_daily_limit_zero(self):
"""Cloud should be allowed when daily limit is 0 (disabled)."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # disabled
mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled
assert tracker.cloud_allowed() is True
def test_blocked_when_monthly_limit_exceeded(self):
"""Cloud should be blocked when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("anthropic", "haiku", cost_usd=999.0)
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled
mock_settings.tier_cloud_monthly_budget_usd = 10.0
assert tracker.cloud_allowed() is False
def test_allowed_at_exact_daily_limit(self):
"""Cloud should be allowed when exactly at daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
# Record exactly at limit
tracker.record_spend("test", "model", cost_usd=5.0)
# At exactly the limit, it should return False (blocked)
# because spend >= limit
assert tracker.cloud_allowed() is False
def test_allowed_below_daily_limit(self):
"""Cloud should be allowed when below daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 5.0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=4.99)
assert tracker.cloud_allowed() is True
def test_zero_budget_blocks_all(self):
"""Zero budget should block all cloud usage."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0.01 # Very small budget
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=0.02)
# Over the tiny budget, should be blocked
assert tracker.cloud_allowed() is False
def test_both_limits_checked(self):
"""Both daily and monthly limits should be checked."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 100.0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("test", "model", cost_usd=15.0)
# Under daily but over monthly
assert tracker.cloud_allowed() is False
# ── Test BudgetTracker summary ────────────────────────────────────────────────
class TestBudgetTrackerSummary:
"""Tests for budget summary functionality."""
def test_summary_keys_present(self):
"""Summary should contain all expected keys."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert "daily_usd" in summary
assert "monthly_usd" in summary
assert "daily_limit_usd" in summary
assert "monthly_limit_usd" in summary
assert "daily_ok" in summary
assert "monthly_ok" in summary
def test_summary_daily_ok_true_on_empty(self):
"""daily_ok and monthly_ok should be True when empty."""
tracker = BudgetTracker(db_path=":memory:")
summary = tracker.get_summary()
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
def test_summary_daily_ok_false_when_exceeded(self):
"""daily_ok should be False when daily limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("openai", "gpt-4o", cost_usd=999.0)
summary = tracker.get_summary()
assert summary["daily_ok"] is False
def test_summary_monthly_ok_false_when_exceeded(self):
"""monthly_ok should be False when monthly limit exceeded."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 10.0
tracker.record_spend("openai", "gpt-4o", cost_usd=15.0)
summary = tracker.get_summary()
assert summary["monthly_ok"] is False
def test_summary_values_rounded(self):
"""Summary values should be rounded appropriately."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1.123456789)
summary = tracker.get_summary()
# daily_usd should be rounded to 6 decimal places
assert summary["daily_usd"] == 1.123457
def test_summary_with_disabled_limits(self):
"""Summary should handle disabled limits (0)."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0
mock_settings.tier_cloud_monthly_budget_usd = 0
tracker.record_spend("test", "model", cost_usd=100.0)
summary = tracker.get_summary()
assert summary["daily_limit_usd"] == 0
assert summary["monthly_limit_usd"] == 0
assert summary["daily_ok"] is True
assert summary["monthly_ok"] is True
# ── Test BudgetTracker in-memory fallback ─────────────────────────────────────
class TestBudgetTrackerInMemoryFallback:
"""Tests for in-memory fallback when DB is unavailable."""
def test_in_memory_records_persisted(self):
"""Records should be stored in memory when DB is unavailable."""
tracker = BudgetTracker(db_path=":memory:")
# Force DB to appear unavailable
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
assert len(tracker._in_memory) == 1
assert tracker._in_memory[0].cost_usd == 0.01
def test_in_memory_query_spend(self):
"""Query spend should work with in-memory fallback."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
tracker.record_spend("test", "model", cost_usd=0.01)
# Query should work from in-memory
since_ts = (datetime.now(UTC) - timedelta(hours=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.01
def test_in_memory_older_records_not_counted(self):
"""In-memory records older than since_ts should not be counted."""
tracker = BudgetTracker(db_path=":memory:")
tracker._db_ok = False
old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp()
tracker._in_memory.append(
SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud")
)
# Query for records in last day
since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp()
result = tracker._query_spend(since_ts)
assert result == 0.0
# ── Test BudgetTracker thread safety ──────────────────────────────────────────
class TestBudgetTrackerThreadSafety:
"""Tests for thread-safe operations."""
def test_concurrent_record_spend(self):
"""Multiple threads should safely record spend concurrently."""
tracker = BudgetTracker(db_path=":memory:")
results = []
errors = []
def record_spends():
try:
for _ in range(10):
cost = tracker.record_spend("test", "model", cost_usd=0.01)
results.append(cost)
except Exception as e:
errors.append(e)
threads = [threading.Thread(target=record_spends) for _ in range(5)]
for t in threads:
t.start()
for t in threads:
t.join()
assert len(errors) == 0
assert len(results) == 50
assert tracker.get_daily_spend() == pytest.approx(0.50, abs=1e-9)
# ── Test BudgetTracker edge cases ─────────────────────────────────────────────
class TestBudgetTrackerEdgeCases:
"""Tests for edge cases and boundary conditions."""
def test_very_small_cost(self):
"""Tracker should handle very small costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=0.000001)
assert tracker.get_daily_spend() == pytest.approx(0.000001, abs=1e-9)
def test_very_large_cost(self):
"""Tracker should handle very large costs."""
tracker = BudgetTracker(db_path=":memory:")
tracker.record_spend("test", "model", cost_usd=1_000_000.0)
assert tracker.get_daily_spend() == pytest.approx(1_000_000.0, abs=1e-9)
def test_many_records(self):
"""Tracker should handle many records efficiently."""
tracker = BudgetTracker(db_path=":memory:")
for i in range(100):
tracker.record_spend(f"provider_{i}", f"model_{i}", cost_usd=0.01)
assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9)
def test_empty_provider_name(self):
"""Tracker should handle empty provider name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("", "model", cost_usd=0.01)
assert cost == 0.01
def test_empty_model_name(self):
"""Tracker should handle empty model name."""
tracker = BudgetTracker(db_path=":memory:")
cost = tracker.record_spend("provider", "", cost_usd=0.01)
assert cost == 0.01
# ── Test get_budget_tracker singleton ─────────────────────────────────────────
class TestGetBudgetTrackerSingleton:
"""Tests for the module-level BudgetTracker singleton."""
def test_returns_budget_tracker(self):
"""Singleton should return a BudgetTracker instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker = get_budget_tracker()
assert isinstance(tracker, BudgetTracker)
def test_returns_same_instance(self):
"""Singleton should return the same instance."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
t1 = get_budget_tracker()
t2 = get_budget_tracker()
assert t1 is t2
def test_singleton_persists_state(self):
"""Singleton should persist state across calls."""
import infrastructure.models.budget as bmod
bmod._budget_tracker = None
tracker1 = get_budget_tracker()
# Record spend
tracker1.record_spend("test", "model", cost_usd=1.0)
# Get singleton again
tracker2 = get_budget_tracker()
assert tracker1 is tracker2
# ── Test BudgetTracker with mocked settings ───────────────────────────────────
class TestBudgetTrackerWithMockedSettings:
"""Tests using mocked settings for different scenarios."""
def test_high_daily_limit(self):
"""Test with high daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1000.0
mock_settings.tier_cloud_monthly_budget_usd = 10000.0
tracker.record_spend("test", "model", cost_usd=500.0)
assert tracker.cloud_allowed() is True
def test_low_daily_limit(self):
"""Test with low daily limit."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 1.0
mock_settings.tier_cloud_monthly_budget_usd = 100.0
tracker.record_spend("test", "model", cost_usd=2.0)
assert tracker.cloud_allowed() is False
def test_only_monthly_limit_enabled(self):
"""Test with only monthly limit enabled."""
tracker = BudgetTracker(db_path=":memory:")
with patch("infrastructure.models.budget.settings") as mock_settings:
mock_settings.tier_cloud_daily_budget_usd = 0 # Disabled
mock_settings.tier_cloud_monthly_budget_usd = 50.0
tracker.record_spend("test", "model", cost_usd=30.0)
assert tracker.cloud_allowed() is True
tracker.record_spend("test", "model", cost_usd=25.0)
assert tracker.cloud_allowed() is False