diff --git a/tests/infrastructure/test_models_budget.py b/tests/infrastructure/test_models_budget.py new file mode 100644 index 00000000..017933c5 --- /dev/null +++ b/tests/infrastructure/test_models_budget.py @@ -0,0 +1,598 @@ +"""Unit tests for models/budget.py — comprehensive coverage for budget management. + +Tests budget allocation, tracking, limit enforcement, and edge cases including: +- Zero budget scenarios +- Over-budget handling +- Budget reset behavior +- In-memory fallback when DB is unavailable +""" + +import threading +import time +from datetime import UTC, date, datetime, timedelta +from unittest.mock import patch + +import pytest + +from infrastructure.models.budget import ( + BudgetTracker, + SpendRecord, + estimate_cost_usd, + get_budget_tracker, +) + +pytestmark = pytest.mark.unit + + +# ── Test SpendRecord dataclass ──────────────────────────────────────────────── + + +class TestSpendRecord: + """Tests for the SpendRecord dataclass.""" + + def test_spend_record_creation(self): + """Test creating a SpendRecord with all fields.""" + ts = time.time() + record = SpendRecord( + ts=ts, + provider="anthropic", + model="claude-haiku-4-5", + tokens_in=100, + tokens_out=200, + cost_usd=0.001, + tier="cloud", + ) + assert record.ts == ts + assert record.provider == "anthropic" + assert record.model == "claude-haiku-4-5" + assert record.tokens_in == 100 + assert record.tokens_out == 200 + assert record.cost_usd == 0.001 + assert record.tier == "cloud" + + def test_spend_record_with_zero_tokens(self): + """Test SpendRecord with zero tokens.""" + ts = time.time() + record = SpendRecord(ts=ts, provider="openai", model="gpt-4o", tokens_in=0, tokens_out=0, cost_usd=0.0, tier="cloud") + assert record.tokens_in == 0 + assert record.tokens_out == 0 + + +# ── Test estimate_cost_usd function ─────────────────────────────────────────── + + +class TestEstimateCostUsd: + """Tests for the estimate_cost_usd function.""" + + def test_haiku_cheaper_than_sonnet(self): + """Haiku should be cheaper than Sonnet for same tokens.""" + haiku_cost = estimate_cost_usd("claude-haiku-4-5", 1000, 1000) + sonnet_cost = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000) + assert haiku_cost < sonnet_cost + + def test_zero_tokens_is_zero_cost(self): + """Zero tokens should result in zero cost.""" + assert estimate_cost_usd("gpt-4o", 0, 0) == 0.0 + + def test_only_input_tokens(self): + """Cost calculation with only input tokens.""" + cost = estimate_cost_usd("gpt-4o", 1000, 0) + expected = (1000 * 0.0025) / 1000.0 # $0.0025 per 1K input tokens + assert cost == pytest.approx(expected) + + def test_only_output_tokens(self): + """Cost calculation with only output tokens.""" + cost = estimate_cost_usd("gpt-4o", 0, 1000) + expected = (1000 * 0.01) / 1000.0 # $0.01 per 1K output tokens + assert cost == pytest.approx(expected) + + def test_unknown_model_uses_default(self): + """Unknown model should use conservative default cost.""" + cost = estimate_cost_usd("some-unknown-model-xyz", 1000, 1000) + assert cost > 0 # Uses conservative default, not zero + # Default is 0.003 input, 0.015 output per 1K + expected = (1000 * 0.003 + 1000 * 0.015) / 1000.0 + assert cost == pytest.approx(expected) + + def test_versioned_model_name_matches(self): + """Versioned model names should match base model rates.""" + cost1 = estimate_cost_usd("claude-haiku-4-5-20251001", 1000, 0) + cost2 = estimate_cost_usd("claude-haiku-4-5", 1000, 0) + assert cost1 == cost2 + + def test_gpt4o_mini_cheaper_than_gpt4o(self): + """GPT-4o mini should be cheaper than GPT-4o.""" + mini = estimate_cost_usd("gpt-4o-mini", 1000, 1000) + full = estimate_cost_usd("gpt-4o", 1000, 1000) + assert mini < full + + def test_opus_most_expensive_claude(self): + """Opus should be the most expensive Claude model.""" + opus = estimate_cost_usd("claude-opus-4-5", 1000, 1000) + sonnet = estimate_cost_usd("claude-sonnet-4-5", 1000, 1000) + haiku = estimate_cost_usd("claude-haiku-4-5", 1000, 1000) + assert opus > sonnet > haiku + + def test_grok_variants(self): + """Test Grok model cost estimation.""" + cost = estimate_cost_usd("grok-3", 1000, 1000) + assert cost > 0 + cost_fast = estimate_cost_usd("grok-3-fast", 1000, 1000) + assert cost_fast > 0 + + def test_case_insensitive_matching(self): + """Model name matching should be case insensitive.""" + cost_lower = estimate_cost_usd("claude-haiku-4-5", 1000, 0) + cost_upper = estimate_cost_usd("CLAUDE-HAIKU-4-5", 1000, 0) + cost_mixed = estimate_cost_usd("Claude-Haiku-4-5", 1000, 0) + assert cost_lower == cost_upper == cost_mixed + + def test_returns_float(self): + """Function should always return a float.""" + assert isinstance(estimate_cost_usd("haiku", 100, 200), float) + assert isinstance(estimate_cost_usd("unknown-model", 100, 200), float) + assert isinstance(estimate_cost_usd("haiku", 0, 0), float) + + +# ── Test BudgetTracker initialization ───────────────────────────────────────── + + +class TestBudgetTrackerInit: + """Tests for BudgetTracker initialization.""" + + def test_creates_with_memory_db(self): + """Tracker should initialize with in-memory database.""" + tracker = BudgetTracker(db_path=":memory:") + assert tracker._db_ok is True + + def test_in_memory_fallback_empty_on_creation(self): + """In-memory fallback should start empty.""" + tracker = BudgetTracker(db_path=":memory:") + assert tracker._in_memory == [] + + def test_custom_db_path(self, tmp_path): + """Tracker should use custom database path.""" + db_file = tmp_path / "custom_budget.db" + tracker = BudgetTracker(db_path=str(db_file)) + assert tracker._db_ok is True + assert tracker._db_path == str(db_file) + assert db_file.exists() + + def test_db_path_directory_creation(self, tmp_path): + """Tracker should create parent directories if needed.""" + db_file = tmp_path / "nested" / "dirs" / "budget.db" + tracker = BudgetTracker(db_path=str(db_file)) + assert tracker._db_ok is True + assert db_file.parent.exists() + + def test_invalid_db_path_fallback(self): + """Tracker should fallback to in-memory on invalid path.""" + # Use a path that cannot be created (e.g., permission denied simulation) + tracker = BudgetTracker.__new__(BudgetTracker) + tracker._db_path = "/nonexistent/invalid/path/budget.db" + tracker._lock = threading.Lock() + tracker._in_memory = [] + tracker._db_ok = False + # Should still work with in-memory fallback + cost = tracker.record_spend("test", "model", cost_usd=0.01) + assert cost == 0.01 + + +# ── Test BudgetTracker record_spend ─────────────────────────────────────────── + + +class TestBudgetTrackerRecordSpend: + """Tests for recording spend events.""" + + def test_record_spend_returns_cost(self): + """record_spend should return the calculated cost.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("anthropic", "claude-haiku-4-5", 100, 200) + assert cost > 0 + + def test_record_spend_explicit_cost(self): + """record_spend should use explicit cost when provided.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("anthropic", "model", cost_usd=1.23) + assert cost == pytest.approx(1.23) + + def test_record_spend_accumulates(self): + """Multiple spend records should accumulate correctly.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("openai", "gpt-4o", cost_usd=0.01) + tracker.record_spend("openai", "gpt-4o", cost_usd=0.02) + assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9) + + def test_record_spend_with_tier_label(self): + """record_spend should accept custom tier labels.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("anthropic", "haiku", tier="cloud_api") + assert cost >= 0 + + def test_record_spend_with_provider(self): + """record_spend should track provider correctly.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("openai", "gpt-4o", cost_usd=0.01) + tracker.record_spend("anthropic", "claude-haiku", cost_usd=0.02) + assert tracker.get_daily_spend() == pytest.approx(0.03, abs=1e-9) + + def test_record_zero_cost(self): + """Recording zero cost should work correctly.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("test", "model", cost_usd=0.0) + assert cost == 0.0 + assert tracker.get_daily_spend() == 0.0 + + def test_record_negative_cost(self): + """Recording negative cost (refund) should work.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("test", "model", cost_usd=-0.50) + assert cost == -0.50 + assert tracker.get_daily_spend() == -0.50 + + +# ── Test BudgetTracker daily/monthly spend queries ──────────────────────────── + + +class TestBudgetTrackerSpendQueries: + """Tests for daily and monthly spend queries.""" + + def test_monthly_spend_includes_daily(self): + """Monthly spend should be >= daily spend.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("anthropic", "haiku", cost_usd=5.00) + assert tracker.get_monthly_spend() >= tracker.get_daily_spend() + + def test_get_daily_spend_empty(self): + """Daily spend should be zero when no records.""" + tracker = BudgetTracker(db_path=":memory:") + assert tracker.get_daily_spend() == 0.0 + + def test_get_monthly_spend_empty(self): + """Monthly spend should be zero when no records.""" + tracker = BudgetTracker(db_path=":memory:") + assert tracker.get_monthly_spend() == 0.0 + + def test_daily_spend_isolation(self): + """Daily spend should only include today's records, not old ones.""" + tracker = BudgetTracker(db_path=":memory:") + # Force use of in-memory fallback + tracker._db_ok = False + + # Add record for today + today_ts = datetime.combine(date.today(), datetime.min.time(), tzinfo=UTC).timestamp() + tracker._in_memory.append( + SpendRecord(today_ts + 3600, "test", "model", 0, 0, 1.0, "cloud") + ) + + # Add old record (2 days ago) + old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp() + tracker._in_memory.append( + SpendRecord(old_ts, "test", "old_model", 0, 0, 2.0, "cloud") + ) + + # Daily should only include today's 1.0 + assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9) + # Monthly should include both (both are in current month) + assert tracker.get_monthly_spend() == pytest.approx(3.0, abs=1e-9) + + +# ── Test BudgetTracker cloud_allowed ────────────────────────────────────────── + + +class TestBudgetTrackerCloudAllowed: + """Tests for cloud budget limit enforcement.""" + + def test_allowed_when_no_spend(self): + """Cloud should be allowed when no spend recorded.""" + tracker = BudgetTracker(db_path=":memory:") + assert tracker.cloud_allowed() is True + + def test_blocked_when_daily_limit_exceeded(self): + """Cloud should be blocked when daily limit exceeded.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("anthropic", "haiku", cost_usd=999.0) + # With default daily limit of 5.0, 999 should block + assert tracker.cloud_allowed() is False + + def test_allowed_when_daily_limit_zero(self): + """Cloud should be allowed when daily limit is 0 (disabled).""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("anthropic", "haiku", cost_usd=999.0) + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 0 # disabled + mock_settings.tier_cloud_monthly_budget_usd = 0 # disabled + assert tracker.cloud_allowed() is True + + def test_blocked_when_monthly_limit_exceeded(self): + """Cloud should be blocked when monthly limit exceeded.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("anthropic", "haiku", cost_usd=999.0) + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 0 # daily disabled + mock_settings.tier_cloud_monthly_budget_usd = 10.0 + assert tracker.cloud_allowed() is False + + def test_allowed_at_exact_daily_limit(self): + """Cloud should be allowed when exactly at daily limit.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 5.0 + mock_settings.tier_cloud_monthly_budget_usd = 0 + # Record exactly at limit + tracker.record_spend("test", "model", cost_usd=5.0) + # At exactly the limit, it should return False (blocked) + # because spend >= limit + assert tracker.cloud_allowed() is False + + def test_allowed_below_daily_limit(self): + """Cloud should be allowed when below daily limit.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 5.0 + mock_settings.tier_cloud_monthly_budget_usd = 0 + tracker.record_spend("test", "model", cost_usd=4.99) + assert tracker.cloud_allowed() is True + + def test_zero_budget_blocks_all(self): + """Zero budget should block all cloud usage.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 0.01 # Very small budget + mock_settings.tier_cloud_monthly_budget_usd = 0 + tracker.record_spend("test", "model", cost_usd=0.02) + # Over the tiny budget, should be blocked + assert tracker.cloud_allowed() is False + + def test_both_limits_checked(self): + """Both daily and monthly limits should be checked.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 100.0 + mock_settings.tier_cloud_monthly_budget_usd = 10.0 + tracker.record_spend("test", "model", cost_usd=15.0) + # Under daily but over monthly + assert tracker.cloud_allowed() is False + + +# ── Test BudgetTracker summary ──────────────────────────────────────────────── + + +class TestBudgetTrackerSummary: + """Tests for budget summary functionality.""" + + def test_summary_keys_present(self): + """Summary should contain all expected keys.""" + tracker = BudgetTracker(db_path=":memory:") + summary = tracker.get_summary() + assert "daily_usd" in summary + assert "monthly_usd" in summary + assert "daily_limit_usd" in summary + assert "monthly_limit_usd" in summary + assert "daily_ok" in summary + assert "monthly_ok" in summary + + def test_summary_daily_ok_true_on_empty(self): + """daily_ok and monthly_ok should be True when empty.""" + tracker = BudgetTracker(db_path=":memory:") + summary = tracker.get_summary() + assert summary["daily_ok"] is True + assert summary["monthly_ok"] is True + + def test_summary_daily_ok_false_when_exceeded(self): + """daily_ok should be False when daily limit exceeded.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("openai", "gpt-4o", cost_usd=999.0) + summary = tracker.get_summary() + assert summary["daily_ok"] is False + + def test_summary_monthly_ok_false_when_exceeded(self): + """monthly_ok should be False when monthly limit exceeded.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 0 + mock_settings.tier_cloud_monthly_budget_usd = 10.0 + tracker.record_spend("openai", "gpt-4o", cost_usd=15.0) + summary = tracker.get_summary() + assert summary["monthly_ok"] is False + + def test_summary_values_rounded(self): + """Summary values should be rounded appropriately.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("test", "model", cost_usd=1.123456789) + summary = tracker.get_summary() + # daily_usd should be rounded to 6 decimal places + assert summary["daily_usd"] == 1.123457 + + def test_summary_with_disabled_limits(self): + """Summary should handle disabled limits (0).""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 0 + mock_settings.tier_cloud_monthly_budget_usd = 0 + tracker.record_spend("test", "model", cost_usd=100.0) + summary = tracker.get_summary() + assert summary["daily_limit_usd"] == 0 + assert summary["monthly_limit_usd"] == 0 + assert summary["daily_ok"] is True + assert summary["monthly_ok"] is True + + +# ── Test BudgetTracker in-memory fallback ───────────────────────────────────── + + +class TestBudgetTrackerInMemoryFallback: + """Tests for in-memory fallback when DB is unavailable.""" + + def test_in_memory_records_persisted(self): + """Records should be stored in memory when DB is unavailable.""" + tracker = BudgetTracker(db_path=":memory:") + # Force DB to appear unavailable + tracker._db_ok = False + tracker.record_spend("test", "model", cost_usd=0.01) + assert len(tracker._in_memory) == 1 + assert tracker._in_memory[0].cost_usd == 0.01 + + def test_in_memory_query_spend(self): + """Query spend should work with in-memory fallback.""" + tracker = BudgetTracker(db_path=":memory:") + tracker._db_ok = False + tracker.record_spend("test", "model", cost_usd=0.01) + # Query should work from in-memory + since_ts = (datetime.now(UTC) - timedelta(hours=1)).timestamp() + result = tracker._query_spend(since_ts) + assert result == 0.01 + + def test_in_memory_older_records_not_counted(self): + """In-memory records older than since_ts should not be counted.""" + tracker = BudgetTracker(db_path=":memory:") + tracker._db_ok = False + old_ts = (datetime.now(UTC) - timedelta(days=2)).timestamp() + tracker._in_memory.append( + SpendRecord(old_ts, "test", "model", 0, 0, 1.0, "cloud") + ) + # Query for records in last day + since_ts = (datetime.now(UTC) - timedelta(days=1)).timestamp() + result = tracker._query_spend(since_ts) + assert result == 0.0 + + +# ── Test BudgetTracker thread safety ────────────────────────────────────────── + + +class TestBudgetTrackerThreadSafety: + """Tests for thread-safe operations.""" + + def test_concurrent_record_spend(self): + """Multiple threads should safely record spend concurrently.""" + tracker = BudgetTracker(db_path=":memory:") + results = [] + errors = [] + + def record_spends(): + try: + for _ in range(10): + cost = tracker.record_spend("test", "model", cost_usd=0.01) + results.append(cost) + except Exception as e: + errors.append(e) + + threads = [threading.Thread(target=record_spends) for _ in range(5)] + for t in threads: + t.start() + for t in threads: + t.join() + + assert len(errors) == 0 + assert len(results) == 50 + assert tracker.get_daily_spend() == pytest.approx(0.50, abs=1e-9) + + +# ── Test BudgetTracker edge cases ───────────────────────────────────────────── + + +class TestBudgetTrackerEdgeCases: + """Tests for edge cases and boundary conditions.""" + + def test_very_small_cost(self): + """Tracker should handle very small costs.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("test", "model", cost_usd=0.000001) + assert tracker.get_daily_spend() == pytest.approx(0.000001, abs=1e-9) + + def test_very_large_cost(self): + """Tracker should handle very large costs.""" + tracker = BudgetTracker(db_path=":memory:") + tracker.record_spend("test", "model", cost_usd=1_000_000.0) + assert tracker.get_daily_spend() == pytest.approx(1_000_000.0, abs=1e-9) + + def test_many_records(self): + """Tracker should handle many records efficiently.""" + tracker = BudgetTracker(db_path=":memory:") + for i in range(100): + tracker.record_spend(f"provider_{i}", f"model_{i}", cost_usd=0.01) + assert tracker.get_daily_spend() == pytest.approx(1.0, abs=1e-9) + + def test_empty_provider_name(self): + """Tracker should handle empty provider name.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("", "model", cost_usd=0.01) + assert cost == 0.01 + + def test_empty_model_name(self): + """Tracker should handle empty model name.""" + tracker = BudgetTracker(db_path=":memory:") + cost = tracker.record_spend("provider", "", cost_usd=0.01) + assert cost == 0.01 + + +# ── Test get_budget_tracker singleton ───────────────────────────────────────── + + +class TestGetBudgetTrackerSingleton: + """Tests for the module-level BudgetTracker singleton.""" + + def test_returns_budget_tracker(self): + """Singleton should return a BudgetTracker instance.""" + import infrastructure.models.budget as bmod + + bmod._budget_tracker = None + tracker = get_budget_tracker() + assert isinstance(tracker, BudgetTracker) + + def test_returns_same_instance(self): + """Singleton should return the same instance.""" + import infrastructure.models.budget as bmod + + bmod._budget_tracker = None + t1 = get_budget_tracker() + t2 = get_budget_tracker() + assert t1 is t2 + + def test_singleton_persists_state(self): + """Singleton should persist state across calls.""" + import infrastructure.models.budget as bmod + + bmod._budget_tracker = None + tracker1 = get_budget_tracker() + # Record spend + tracker1.record_spend("test", "model", cost_usd=1.0) + # Get singleton again + tracker2 = get_budget_tracker() + assert tracker1 is tracker2 + + +# ── Test BudgetTracker with mocked settings ─────────────────────────────────── + + +class TestBudgetTrackerWithMockedSettings: + """Tests using mocked settings for different scenarios.""" + + def test_high_daily_limit(self): + """Test with high daily limit.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 1000.0 + mock_settings.tier_cloud_monthly_budget_usd = 10000.0 + tracker.record_spend("test", "model", cost_usd=500.0) + assert tracker.cloud_allowed() is True + + def test_low_daily_limit(self): + """Test with low daily limit.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 1.0 + mock_settings.tier_cloud_monthly_budget_usd = 100.0 + tracker.record_spend("test", "model", cost_usd=2.0) + assert tracker.cloud_allowed() is False + + def test_only_monthly_limit_enabled(self): + """Test with only monthly limit enabled.""" + tracker = BudgetTracker(db_path=":memory:") + with patch("infrastructure.models.budget.settings") as mock_settings: + mock_settings.tier_cloud_daily_budget_usd = 0 # Disabled + mock_settings.tier_cloud_monthly_budget_usd = 50.0 + tracker.record_spend("test", "model", cost_usd=30.0) + assert tracker.cloud_allowed() is True + tracker.record_spend("test", "model", cost_usd=25.0) + assert tracker.cloud_allowed() is False