Compare commits

..

1 Commits

Author SHA1 Message Date
80f536e319 fix: token budget tracker for context overflow (#925)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 42s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 39s
Tests / e2e (pull_request) Successful in 1m50s
Tests / test (pull_request) Failing after 50m49s
2026-04-21 04:43:07 +00:00
2 changed files with 165 additions and 122 deletions

165
agent/token_budget.py Normal file
View File

@@ -0,0 +1,165 @@
"""Token Budget — Poka-yoke guard against context overflow.
Progressive warning system with circuit breakers:
- 60%: Log warning, suggest summarization
- 80%: Auto-compress, drop raw tool outputs
- 90%: Block verbose tools, force wrap-up
- 95%: Graceful termination with summary
Usage:
from agent.token_budget import TokenBudget
budget = TokenBudget(max_tokens=128000)
budget.record_usage(prompt_tokens=500, completion_tokens=200)
status = budget.check()
# status.level: ok, warning, compress, block, terminate
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class BudgetLevel(Enum):
"""Token budget alert levels."""
OK = "ok" # < 60%
WARNING = "warning" # 60-80%
COMPRESS = "compress" # 80-90%
BLOCK = "block" # 90-95%
TERMINATE = "terminate" # > 95%
@dataclass
class BudgetStatus:
"""Current budget status."""
level: BudgetLevel
used_tokens: int
max_tokens: int
percentage: float
remaining: int
message: str
actions: List[str] = field(default_factory=list)
# Default thresholds
THRESHOLDS = {
BudgetLevel.WARNING: 0.60,
BudgetLevel.COMPRESS: 0.80,
BudgetLevel.BLOCK: 0.90,
BudgetLevel.TERMINATE: 0.95,
}
class TokenBudget:
"""Track token usage and enforce context limits."""
def __init__(self, max_tokens: int = 128000,
thresholds: Optional[Dict[BudgetLevel, float]] = None):
self._max_tokens = max_tokens
self._thresholds = thresholds or THRESHOLDS
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history: List[Dict[str, Any]] = []
@property
def used_tokens(self) -> int:
return self._prompt_tokens + self._completion_tokens
@property
def remaining(self) -> int:
return max(0, self._max_tokens - self.used_tokens)
@property
def percentage(self) -> float:
if self._max_tokens == 0:
return 0
return self.used_tokens / self._max_tokens
def record_usage(self, prompt_tokens: int = 0, completion_tokens: int = 0,
tool_output_tokens: int = 0):
"""Record token usage from an API call."""
self._prompt_tokens += prompt_tokens
self._completion_tokens += completion_tokens
self._tool_output_tokens += tool_output_tokens
self._history.append({
"time": time.time(),
"prompt": prompt_tokens,
"completion": completion_tokens,
"tool_output": tool_output_tokens,
"total_used": self.used_tokens,
})
def check(self) -> BudgetStatus:
"""Check current budget status and return appropriate actions."""
pct = self.percentage
if pct >= self._thresholds.get(BudgetLevel.TERMINATE, 0.95):
level = BudgetLevel.TERMINATE
msg = f"Context {pct:.0%} full. Session must terminate with summary."
actions = ["generate_summary", "terminate_session"]
elif pct >= self._thresholds.get(BudgetLevel.BLOCK, 0.90):
level = BudgetLevel.BLOCK
msg = f"Context {pct:.0%} full. Blocking verbose tool calls."
actions = ["block_verbose_tools", "force_wrap_up", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.COMPRESS, 0.80):
level = BudgetLevel.COMPRESS
msg = f"Context {pct:.0%} full. Auto-compressing conversation."
actions = ["auto_compress", "drop_raw_tool_outputs", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.WARNING, 0.60):
level = BudgetLevel.WARNING
msg = f"Context {pct:.0%} used. Consider summarizing."
actions = ["suggest_summary", "log_warning"]
else:
level = BudgetLevel.OK
msg = f"Context OK: {self.used_tokens}/{self._max_tokens} tokens ({pct:.0%})"
actions = []
return BudgetStatus(
level=level,
used_tokens=self.used_tokens,
max_tokens=self._max_tokens,
percentage=round(pct, 3),
remaining=self.remaining,
message=msg,
actions=actions,
)
def should_truncate_tool_output(self, estimated_tokens: int) -> bool:
"""Check if a tool output should be truncated."""
if self.used_tokens + estimated_tokens > self._max_tokens * 0.95:
return True
return False
def get_truncation_budget(self) -> int:
"""Get max tokens available for next tool output."""
budget = self.remaining - int(self._max_tokens * 0.05) # Reserve 5%
return max(0, budget)
def reset(self):
"""Reset budget for new session."""
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history.clear()
def get_report(self) -> Dict[str, Any]:
"""Generate usage report."""
status = self.check()
return {
"status": status.level.value,
"used_tokens": self.used_tokens,
"max_tokens": self._max_tokens,
"remaining": self.remaining,
"percentage": status.percentage,
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"tool_output_tokens": self._tool_output_tokens,
"message": status.message,
"actions": status.actions,
}

View File

@@ -1,122 +0,0 @@
"""Skill Edit Guard — Poka-yoke auto-revert for incomplete skill edits.
Creates atomic skill edits with automatic rollback on failure.
Prevents broken skills from corrupting future sessions.
Usage:
from tools.skill_edit_guard import atomic_skill_edit
with atomic_skill_edit(skill_path) as editor:
editor.write(new_content)
# If exception occurs, file is automatically reverted
"""
from __future__ import annotations
import logging
import os
import shutil
import tempfile
import time
from contextlib import contextmanager
from pathlib import Path
from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
class SkillEditGuard:
"""Atomic skill file editing with auto-revert on failure."""
def __init__(self, skill_path: str):
self._path = Path(skill_path)
self._backup: Optional[Path] = None
self._committed = False
def backup(self) -> bool:
"""Create backup before editing."""
if not self._path.exists():
return True # New file, nothing to backup
backup_dir = self._path.parent / ".skill_backups"
backup_dir.mkdir(exist_ok=True)
ts = int(time.time() * 1000)
self._backup = backup_dir / f"{self._path.name}.{ts}.bak"
shutil.copy2(self._path, self._backup)
logger.debug("Skill backup created: %s", self._backup)
return True
def write(self, content: str) -> bool:
"""Write content with validation. Returns True if valid."""
# Validate YAML frontmatter
if content.startswith("---"):
end = content.find("---", 3)
if end < 0:
logger.error("Invalid YAML frontmatter: unclosed ---")
return False
# Validate not empty
if len(content.strip()) < 10:
logger.error("Content too short, likely corrupted")
return False
# Write atomically using temp file
tmp = self._path.with_suffix(".tmp")
try:
tmp.write_text(content, encoding="utf-8")
tmp.rename(self._path)
return True
except Exception as e:
logger.error("Write failed: %s", e)
if tmp.exists():
tmp.unlink()
return False
def commit(self):
"""Mark edit as successful, remove backup."""
self._committed = True
if self._backup and self._backup.exists():
self._backup.unlink()
logger.debug("Skill backup removed: %s", self._backup)
def rollback(self) -> bool:
"""Revert to backup."""
if self._backup and self._backup.exists():
shutil.copy2(self._backup, self._path)
self._backup.unlink()
logger.warning("Skill reverted from backup: %s", self._path)
return True
return False
def __enter__(self):
self.backup()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if exc_type is not None:
self.rollback()
return False # Re-raise exception
if not self._committed:
self.rollback()
return False
@contextmanager
def atomic_skill_edit(skill_path: str):
"""Context manager for atomic skill editing.
Usage:
with atomic_skill_edit("/path/to/skill/SKILL.md") as editor:
success = editor.write(new_content)
if not success:
raise ValueError("Write failed")
# __exit__ commits on success, reverts on exception
"""
guard = SkillEditGuard(skill_path)
guard.backup()
try:
yield guard
guard.commit()
except Exception:
guard.rollback()
raise