Compare commits

..

2 Commits

Author SHA1 Message Date
80f536e319 fix: token budget tracker for context overflow (#925)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 42s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 39s
Tests / e2e (pull_request) Successful in 1m50s
Tests / test (pull_request) Failing after 50m49s
2026-04-21 04:43:07 +00:00
c6f2855745 fix: restore _format_error helper for test compatibility (#916)
Some checks failed
Docker Build and Publish / build-and-push (push) Has been skipped
Nix / nix (ubuntu-latest) (push) Failing after 2s
Tests / e2e (push) Successful in 2m47s
Tests / test (push) Failing after 27m41s
Build Skills Index / build-index (push) Has been skipped
Build Skills Index / deploy-with-index (push) Has been skipped
Nix / nix (macos-latest) (push) Has been cancelled
fix: restore _format_error helper for test compatibility (#916)
2026-04-20 23:56:27 +00:00
3 changed files with 193 additions and 224 deletions

165
agent/token_budget.py Normal file
View File

@@ -0,0 +1,165 @@
"""Token Budget — Poka-yoke guard against context overflow.
Progressive warning system with circuit breakers:
- 60%: Log warning, suggest summarization
- 80%: Auto-compress, drop raw tool outputs
- 90%: Block verbose tools, force wrap-up
- 95%: Graceful termination with summary
Usage:
from agent.token_budget import TokenBudget
budget = TokenBudget(max_tokens=128000)
budget.record_usage(prompt_tokens=500, completion_tokens=200)
status = budget.check()
# status.level: ok, warning, compress, block, terminate
"""
from __future__ import annotations
import logging
import time
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
logger = logging.getLogger(__name__)
class BudgetLevel(Enum):
"""Token budget alert levels."""
OK = "ok" # < 60%
WARNING = "warning" # 60-80%
COMPRESS = "compress" # 80-90%
BLOCK = "block" # 90-95%
TERMINATE = "terminate" # > 95%
@dataclass
class BudgetStatus:
"""Current budget status."""
level: BudgetLevel
used_tokens: int
max_tokens: int
percentage: float
remaining: int
message: str
actions: List[str] = field(default_factory=list)
# Default thresholds
THRESHOLDS = {
BudgetLevel.WARNING: 0.60,
BudgetLevel.COMPRESS: 0.80,
BudgetLevel.BLOCK: 0.90,
BudgetLevel.TERMINATE: 0.95,
}
class TokenBudget:
"""Track token usage and enforce context limits."""
def __init__(self, max_tokens: int = 128000,
thresholds: Optional[Dict[BudgetLevel, float]] = None):
self._max_tokens = max_tokens
self._thresholds = thresholds or THRESHOLDS
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history: List[Dict[str, Any]] = []
@property
def used_tokens(self) -> int:
return self._prompt_tokens + self._completion_tokens
@property
def remaining(self) -> int:
return max(0, self._max_tokens - self.used_tokens)
@property
def percentage(self) -> float:
if self._max_tokens == 0:
return 0
return self.used_tokens / self._max_tokens
def record_usage(self, prompt_tokens: int = 0, completion_tokens: int = 0,
tool_output_tokens: int = 0):
"""Record token usage from an API call."""
self._prompt_tokens += prompt_tokens
self._completion_tokens += completion_tokens
self._tool_output_tokens += tool_output_tokens
self._history.append({
"time": time.time(),
"prompt": prompt_tokens,
"completion": completion_tokens,
"tool_output": tool_output_tokens,
"total_used": self.used_tokens,
})
def check(self) -> BudgetStatus:
"""Check current budget status and return appropriate actions."""
pct = self.percentage
if pct >= self._thresholds.get(BudgetLevel.TERMINATE, 0.95):
level = BudgetLevel.TERMINATE
msg = f"Context {pct:.0%} full. Session must terminate with summary."
actions = ["generate_summary", "terminate_session"]
elif pct >= self._thresholds.get(BudgetLevel.BLOCK, 0.90):
level = BudgetLevel.BLOCK
msg = f"Context {pct:.0%} full. Blocking verbose tool calls."
actions = ["block_verbose_tools", "force_wrap_up", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.COMPRESS, 0.80):
level = BudgetLevel.COMPRESS
msg = f"Context {pct:.0%} full. Auto-compressing conversation."
actions = ["auto_compress", "drop_raw_tool_outputs", "suggest_summary"]
elif pct >= self._thresholds.get(BudgetLevel.WARNING, 0.60):
level = BudgetLevel.WARNING
msg = f"Context {pct:.0%} used. Consider summarizing."
actions = ["suggest_summary", "log_warning"]
else:
level = BudgetLevel.OK
msg = f"Context OK: {self.used_tokens}/{self._max_tokens} tokens ({pct:.0%})"
actions = []
return BudgetStatus(
level=level,
used_tokens=self.used_tokens,
max_tokens=self._max_tokens,
percentage=round(pct, 3),
remaining=self.remaining,
message=msg,
actions=actions,
)
def should_truncate_tool_output(self, estimated_tokens: int) -> bool:
"""Check if a tool output should be truncated."""
if self.used_tokens + estimated_tokens > self._max_tokens * 0.95:
return True
return False
def get_truncation_budget(self) -> int:
"""Get max tokens available for next tool output."""
budget = self.remaining - int(self._max_tokens * 0.05) # Reserve 5%
return max(0, budget)
def reset(self):
"""Reset budget for new session."""
self._prompt_tokens = 0
self._completion_tokens = 0
self._tool_output_tokens = 0
self._history.clear()
def get_report(self) -> Dict[str, Any]:
"""Generate usage report."""
status = self.check()
return {
"status": status.level.value,
"used_tokens": self.used_tokens,
"max_tokens": self._max_tokens,
"remaining": self.remaining,
"percentage": status.percentage,
"prompt_tokens": self._prompt_tokens,
"completion_tokens": self._completion_tokens,
"tool_output_tokens": self._tool_output_tokens,
"message": status.message,
"actions": status.actions,
}

View File

@@ -1,224 +0,0 @@
"""
Gateway Config Validator & Fallback Fix — #892.
Validates gateway configuration and provides sensible defaults
for missing keys to prevent fallback chain breaks.
"""
import logging
import os
from typing import Dict, Any, List, Optional
from dataclasses import dataclass, field
logger = logging.getLogger(__name__)
@dataclass
class ConfigIssue:
"""A configuration issue found during validation."""
key: str
severity: str # error, warning, info
message: str
fix: str
@dataclass
class ConfigValidation:
"""Result of config validation."""
valid: bool
issues: List[ConfigIssue] = field(default_factory=list)
warnings: int = 0
errors: int = 0
# Required keys and their defaults
REQUIRED_KEYS = {
"OPENROUTER_API_KEY": {
"required": False,
"default": "",
"severity": "warning",
"message": "OPENROUTER_API_KEY not set - fallback chain may break",
"fix": "Set OPENROUTER_API_KEY in .env for OpenRouter provider",
},
"API_SERVER_KEY": {
"required": False,
"default": "",
"severity": "warning",
"message": "API_SERVER_KEY not configured",
"fix": "Set API_SERVER_KEY in .env for API server auth",
},
"GITEA_TOKEN": {
"required": False,
"default": "",
"severity": "info",
"message": "GITEA_TOKEN not set - Gitea features disabled",
"fix": "Set GITEA_TOKEN in .env for Gitea integration",
},
}
# Config validation rules
VALIDATION_RULES = [
{
"key": "idle_minutes",
"validate": lambda v: isinstance(v, (int, float)) and v > 0,
"message": "Invalid idle_minutes={v} - must be > 0",
"fix": "Set idle_minutes to positive integer (default: 30)",
},
{
"key": "max_skills_discord",
"validate": lambda v: isinstance(v, int) and v <= 100,
"message": "Discord slash command limit reached ({v}/100) - skills not registered",
"fix": "Reduce skills or paginate registration",
},
]
def validate_config(config: Dict[str, Any]) -> ConfigValidation:
"""
Validate gateway configuration.
Args:
config: Configuration dictionary
Returns:
ConfigValidation with issues found
"""
issues = []
# Check required keys
for key, spec in REQUIRED_KEYS.items():
value = config.get(key) or os.environ.get(key) or spec["default"]
if spec["required"] and not value:
issues.append(ConfigIssue(
key=key,
severity=spec["severity"],
message=spec["message"],
fix=spec["fix"],
))
elif not value and spec["severity"] != "error":
issues.append(ConfigIssue(
key=key,
severity=spec["severity"],
message=spec["message"],
fix=spec["fix"],
))
# Check validation rules
for rule in VALIDATION_RULES:
value = config.get(rule["key"])
if value is not None:
if not rule["validate"](value):
issues.append(ConfigIssue(
key=rule["key"],
severity="error",
message=rule["message"].format(v=value),
fix=rule["fix"],
))
errors = sum(1 for i in issues if i.severity == "error")
warnings = sum(1 for i in issues if i.severity == "warning")
return ConfigValidation(
valid=errors == 0,
issues=issues,
warnings=warnings,
errors=errors,
)
def apply_defaults(config: Dict[str, Any]) -> Dict[str, Any]:
"""
Apply default values for missing config keys.
Args:
config: Configuration dictionary
Returns:
Config with defaults applied
"""
result = dict(config)
for key, spec in REQUIRED_KEYS.items():
if key not in result or not result[key]:
default = os.environ.get(key) or spec["default"]
if default:
result[key] = default
logger.debug("Applied default for %s", key)
# Apply validation defaults
if "idle_minutes" not in result or not result["idle_minutes"] or result["idle_minutes"] <= 0:
result["idle_minutes"] = 30
logger.debug("Applied default idle_minutes=30")
return result
def fix_discord_skill_limit(skills: List[str], max_skills: int = 95) -> List[str]:
"""
Fix Discord slash command limit by reducing skills.
Args:
skills: List of skill names
max_skills: Maximum skills to register (default 95, leaving room for built-ins)
Returns:
Reduced skill list
"""
if len(skills) <= max_skills:
return skills
logger.warning(
"Discord skill limit: %d skills exceeds %d limit, truncating",
len(skills), max_skills
)
# Keep first max_skills (alphabetical priority)
return sorted(skills)[:max_skills]
def validate_provider_config(provider: str, config: Dict[str, Any]) -> ConfigIssue:
"""
Validate provider-specific configuration.
Args:
provider: Provider name
config: Provider config
Returns:
ConfigIssue if invalid, None if valid
"""
if provider == "local-llama.cpp":
# Check if llama.cpp is configured
if not config.get("model_path") and not config.get("base_url"):
return ConfigIssue(
key=f"provider.{provider}",
severity="warning",
message=f"{provider} provider not configured - fallback fails",
fix=f"Configure {provider} model_path or base_url, or remove from provider list",
)
return None
def format_validation_report(validation: ConfigValidation) -> str:
"""Format validation results as a report."""
lines = [
"=" * 50,
"GATEWAY CONFIG VALIDATION",
"=" * 50,
"",
f"Status: {'VALID' if validation.valid else 'INVALID'}",
f"Errors: {validation.errors}",
f"Warnings: {validation.warnings}",
"",
]
if validation.issues:
lines.append("Issues:")
for issue in validation.issues:
icon = "" if issue.severity == "error" else "⚠️" if issue.severity == "warning" else ""
lines.append(f" {icon} [{issue.key}] {issue.message}")
lines.append(f" Fix: {issue.fix}")
lines.append("")
return "\n".join(lines)

View File

@@ -44,6 +44,34 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try: