#!/usr/bin/env python3 """ Tests for agent/token_budget.py — Poka-yoke context overflow guard. """ import sys from pathlib import Path import pytest sys.path.insert(0, str(Path(__file__).parent.parent)) from agent.token_budget import ( TokenBudget, BudgetLevel, BudgetStatus, WARN_PERCENT, CAUTION_PERCENT, CRITICAL_PERCENT, STOP_PERCENT, ) @pytest.fixture def budget(): """Standard 128K context budget.""" return TokenBudget(context_length=128_000) @pytest.fixture def small_budget(): """4K context for tight testing.""" return TokenBudget(context_length=4_000) # ── Threshold Levels ────────────────────────────────────────────────── class TestThresholds: def test_normal_below_60(self, budget): budget.update(50_000) # 39% status = budget.check() assert status.level == BudgetLevel.NORMAL assert not status.should_compress assert not status.should_block_tools assert not status.should_terminate def test_warning_at_60(self, budget): budget.update(int(128_000 * 0.62)) # 62% status = budget.check() assert status.level == BudgetLevel.WARNING assert not status.should_compress assert not status.should_block_tools def test_caution_at_80(self, budget): budget.update(int(128_000 * 0.82)) # 82% status = budget.check() assert status.level == BudgetLevel.CAUTION assert status.should_compress assert not status.should_block_tools assert not status.should_terminate def test_critical_at_90(self, budget): budget.update(int(128_000 * 0.91)) # 91% status = budget.check() assert status.level == BudgetLevel.CRITICAL assert status.should_compress assert status.should_block_tools assert not status.should_terminate def test_stop_at_95(self, budget): budget.update(int(128_000 * 0.96)) # 96% status = budget.check() assert status.level == BudgetLevel.STOP assert status.should_compress assert status.should_block_tools assert status.should_terminate def test_small_context_thresholds(self, small_budget): # 4K * 0.60 = 2400 small_budget.update(2450) assert small_budget.check().level == BudgetLevel.WARNING small_budget.update(3250) # 4K * 0.81 assert small_budget.check().level == BudgetLevel.CAUTION small_budget.update(3650) # 4K * 0.91 assert small_budget.check().level == BudgetLevel.CRITICAL small_budget.update(3850) # 4K * 0.96 assert small_budget.check().level == BudgetLevel.STOP # ── Convenience Methods ─────────────────────────────────────────────── class TestConvenienceMethods: def test_should_compress(self, budget): budget.update(int(128_000 * 0.79)) assert not budget.should_compress() budget.update(int(128_000 * 0.80)) assert budget.should_compress() def test_should_block_tools(self, budget): budget.update(int(128_000 * 0.89)) assert not budget.should_block_tools() budget.update(int(128_000 * 0.90)) assert budget.should_block_tools() def test_should_terminate(self, budget): budget.update(int(128_000 * 0.94)) assert not budget.should_terminate() budget.update(int(128_000 * 0.95)) assert budget.should_terminate() # ── Tool Output Budgeting ───────────────────────────────────────────── class TestToolOutputBudget: def test_normal_budget(self, budget): budget.update(int(128_000 * 0.50)) assert budget.tool_output_budget() == 50_000 def test_warning_budget(self, budget): budget.update(int(128_000 * 0.65)) assert budget.tool_output_budget() == 20_000 def test_caution_budget(self, budget): budget.update(int(128_000 * 0.85)) assert budget.tool_output_budget() == 8_000 def test_critical_budget(self, budget): budget.update(int(128_000 * 0.92)) assert budget.tool_output_budget() == 2_000 def test_truncate_short_unchanged(self, budget): result = budget.truncate_tool_output("short text", max_chars=1000) assert result == "short text" def test_truncate_long(self, budget): long_text = "A" * 100_000 result = budget.truncate_tool_output(long_text, max_chars=5_000) assert len(result) <= 5_100 # small overhead for notice assert "truncated" in result assert "A" in result[:2500] # head preserved assert "A" in result[-2500:] # tail preserved def test_truncate_very_small(self, budget): long_text = "X" * 1000 result = budget.truncate_tool_output(long_text, max_chars=50) assert len(result) <= 50 + 20 assert "truncated" in result # ── Growth Tracking ─────────────────────────────────────────────────── class TestGrowthTracking: def test_growth_rate(self, budget): budget.update(10_000) budget.update(15_000) budget.update(20_000) assert budget.growth_rate() == 5_000.0 def test_turns_remaining(self, budget): budget.update(10_000) budget.update(15_000) budget.update(20_000) # rate=5000, remaining=108000, turns=~21 turns = budget.turns_remaining() assert turns is not None assert 18 <= turns <= 24 def test_no_history(self, budget): assert budget.growth_rate() is None assert budget.turns_remaining() is None # ── Status Indicators ───────────────────────────────────────────────── class TestStatusIndicators: def test_indicator_normal(self, budget): budget.update(int(128_000 * 0.50)) status = budget.check() indicator = status.to_indicator() assert "50" in indicator def test_indicator_warning(self, budget): budget.update(int(128_000 * 0.65)) status = budget.check() indicator = status.to_indicator() assert "\u26a0" in indicator or "65" in indicator def test_bar(self, budget): budget.update(int(128_000 * 0.50)) status = budget.check() bar = status.to_bar() assert "50" in bar def test_summary(self, budget): budget.update(50_000) summary = budget.summary() assert "50,000" in summary assert "128,000" in summary assert "NORMAL" in summary # ── Reset ───────────────────────────────────────────────────────────── class TestReset: def test_reset_clears_state(self, budget): budget.update(int(128_000 * 0.90)) budget.reset() assert budget.tokens_used == 0 assert budget.check().level == BudgetLevel.NORMAL assert budget.growth_rate() is None # ── Edge Cases ──────────────────────────────────────────────────────── class TestEdgeCases: def test_exact_threshold_boundary(self, budget): # Exactly at 60% budget.update(int(128_000 * 0.60)) assert budget.check().level == BudgetLevel.WARNING def test_zero_context(self): budget = TokenBudget(context_length=0) status = budget.check() assert status.percent_used == 0 def test_remaining_for_response(self, budget): budget.update(100_000) remaining = budget.remaining_for_response() # 128000 - 100000 - 6400 (5% reserve) = 21600 assert remaining > 0 assert remaining < 128_000 if __name__ == "__main__": pytest.main([__file__, "-v"])