Compare commits

...

2 Commits

Author SHA1 Message Date
80f536e319 fix: token budget tracker for context overflow (#925)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 42s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 39s
Tests / e2e (pull_request) Successful in 1m50s
Tests / test (pull_request) Failing after 50m49s
2026-04-21 04:43:07 +00:00
c6f2855745 fix: restore _format_error helper for test compatibility (#916)
Some checks failed
Docker Build and Publish / build-and-push (push) Has been skipped
Nix / nix (ubuntu-latest) (push) Failing after 2s
Tests / e2e (push) Successful in 2m47s
Tests / test (push) Failing after 27m41s
Build Skills Index / build-index (push) Has been skipped
Build Skills Index / deploy-with-index (push) Has been skipped
Nix / nix (macos-latest) (push) Has been cancelled
fix: restore _format_error helper for test compatibility (#916)
2026-04-20 23:56:27 +00:00
2 changed files with 193 additions and 0 deletions

165
agent/token_budget.py Normal file
View File

@@ -0,0 +1,165 @@
"""Token Budget — Poka-yoke guard against context overflow.
Progressive warning system with circuit breakers:
- 60%: Log warning, suggest summarization
- 80%: Auto-compress, drop raw tool outputs
- 90%: Block verbose tools, force wrap-up
- 95%: Graceful termination with summary
Usage:
from agent.token_budget import TokenBudget
budget = TokenBudget(max_tokens=128000)
budget.record_usage(prompt_tokens=500, completion_tokens=200)
status = budget.check()
# status.level: ok, warning, compress, block, terminate
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class BudgetLevel(Enum):
"""Token budget alert levels."""
OK = "ok" # < 60%
WARNING = "warning" # 60-80%
COMPRESS = "compress" # 80-90%
BLOCK = "block" # 90-95%
TERMINATE = "terminate" # > 95%
@dataclass
class BudgetStatus:
"""Current budget status."""
level: BudgetLevel
used_tokens: int
max_tokens: int
percentage: float
remaining: int
message: str
actions: List[str] = field(default_factory=list)
# Default thresholds
THRESHOLDS = {
BudgetLevel.WARNING: 0.60,
BudgetLevel.COMPRESS: 0.80,
BudgetLevel.BLOCK: 0.90,
BudgetLevel.TERMINATE: 0.95,
}
class TokenBudget:
"""Track token usage and enforce context limits."""
def __init__(self, max_tokens: int = 128000,
thresholds: Optional[Dict[BudgetLevel, float]] = None):
self._max_tokens = max_tokens
self._thresholds = thresholds or THRESHOLDS
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history: List[Dict[str, Any]] = []
@property
def used_tokens(self) -> int:
return self._prompt_tokens + self._completion_tokens
@property
def remaining(self) -> int:
return max(0, self._max_tokens - self.used_tokens)
@property
def percentage(self) -> float:
if self._max_tokens == 0:
return 0
return self.used_tokens / self._max_tokens
def record_usage(self, prompt_tokens: int = 0, completion_tokens: int = 0,
tool_output_tokens: int = 0):
"""Record token usage from an API call."""
self._prompt_tokens += prompt_tokens
self._completion_tokens += completion_tokens
self._tool_output_tokens += tool_output_tokens
self._history.append({
"time": time.time(),
"prompt": prompt_tokens,
"completion": completion_tokens,
"tool_output": tool_output_tokens,
"total_used": self.used_tokens,
})
def check(self) -> BudgetStatus:
"""Check current budget status and return appropriate actions."""
pct = self.percentage
if pct >= self._thresholds.get(BudgetLevel.TERMINATE, 0.95):
level = BudgetLevel.TERMINATE
msg = f"Context {pct:.0%} full. Session must terminate with summary."
actions = ["generate_summary", "terminate_session"]
elif pct >= self._thresholds.get(BudgetLevel.BLOCK, 0.90):
level = BudgetLevel.BLOCK
msg = f"Context {pct:.0%} full. Blocking verbose tool calls."
actions = ["block_verbose_tools", "force_wrap_up", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.COMPRESS, 0.80):
level = BudgetLevel.COMPRESS
msg = f"Context {pct:.0%} full. Auto-compressing conversation."
actions = ["auto_compress", "drop_raw_tool_outputs", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.WARNING, 0.60):
level = BudgetLevel.WARNING
msg = f"Context {pct:.0%} used. Consider summarizing."
actions = ["suggest_summary", "log_warning"]
else:
level = BudgetLevel.OK
msg = f"Context OK: {self.used_tokens}/{self._max_tokens} tokens ({pct:.0%})"
actions = []
return BudgetStatus(
level=level,
used_tokens=self.used_tokens,
max_tokens=self._max_tokens,
percentage=round(pct, 3),
remaining=self.remaining,
message=msg,
actions=actions,
)
def should_truncate_tool_output(self, estimated_tokens: int) -> bool:
"""Check if a tool output should be truncated."""
if self.used_tokens + estimated_tokens > self._max_tokens * 0.95:
return True
return False
def get_truncation_budget(self) -> int:
"""Get max tokens available for next tool output."""
budget = self.remaining - int(self._max_tokens * 0.05) # Reserve 5%
return max(0, budget)
def reset(self):
"""Reset budget for new session."""
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history.clear()
def get_report(self) -> Dict[str, Any]:
"""Generate usage report."""
status = self.check()
return {
"status": status.level.value,
"used_tokens": self.used_tokens,
"max_tokens": self._max_tokens,
"remaining": self.remaining,
"percentage": status.percentage,
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"tool_output_tokens": self._tool_output_tokens,
"message": status.message,
"actions": status.actions,
}

View File

@@ -44,6 +44,34 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try: