Compare commits
1 Commits
fix/925
...
feat/queue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
73984ca72f |
@@ -1,165 +0,0 @@
|
||||
"""Token Budget — Poka-yoke guard against context overflow.
|
||||
|
||||
Progressive warning system with circuit breakers:
|
||||
- 60%: Log warning, suggest summarization
|
||||
- 80%: Auto-compress, drop raw tool outputs
|
||||
- 90%: Block verbose tools, force wrap-up
|
||||
- 95%: Graceful termination with summary
|
||||
|
||||
Usage:
|
||||
from agent.token_budget import TokenBudget
|
||||
budget = TokenBudget(max_tokens=128000)
|
||||
budget.record_usage(prompt_tokens=500, completion_tokens=200)
|
||||
status = budget.check()
|
||||
# status.level: ok, warning, compress, block, terminate
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class BudgetLevel(Enum):
|
||||
"""Token budget alert levels."""
|
||||
OK = "ok" # < 60%
|
||||
WARNING = "warning" # 60-80%
|
||||
COMPRESS = "compress" # 80-90%
|
||||
BLOCK = "block" # 90-95%
|
||||
TERMINATE = "terminate" # > 95%
|
||||
|
||||
|
||||
@dataclass
|
||||
class BudgetStatus:
|
||||
"""Current budget status."""
|
||||
level: BudgetLevel
|
||||
used_tokens: int
|
||||
max_tokens: int
|
||||
percentage: float
|
||||
remaining: int
|
||||
message: str
|
||||
actions: List[str] = field(default_factory=list)
|
||||
|
||||
|
||||
# Default thresholds
|
||||
THRESHOLDS = {
|
||||
BudgetLevel.WARNING: 0.60,
|
||||
BudgetLevel.COMPRESS: 0.80,
|
||||
BudgetLevel.BLOCK: 0.90,
|
||||
BudgetLevel.TERMINATE: 0.95,
|
||||
}
|
||||
|
||||
|
||||
class TokenBudget:
|
||||
"""Track token usage and enforce context limits."""
|
||||
|
||||
def __init__(self, max_tokens: int = 128000,
|
||||
thresholds: Optional[Dict[BudgetLevel, float]] = None):
|
||||
self._max_tokens = max_tokens
|
||||
self._thresholds = thresholds or THRESHOLDS
|
||||
self._prompt_tokens = 0
|
||||
self._completion_tokens = 0
|
||||
self._tool_output_tokens = 0
|
||||
self._history: List[Dict[str, Any]] = []
|
||||
|
||||
@property
|
||||
def used_tokens(self) -> int:
|
||||
return self._prompt_tokens + self._completion_tokens
|
||||
|
||||
@property
|
||||
def remaining(self) -> int:
|
||||
return max(0, self._max_tokens - self.used_tokens)
|
||||
|
||||
@property
|
||||
def percentage(self) -> float:
|
||||
if self._max_tokens == 0:
|
||||
return 0
|
||||
return self.used_tokens / self._max_tokens
|
||||
|
||||
def record_usage(self, prompt_tokens: int = 0, completion_tokens: int = 0,
|
||||
tool_output_tokens: int = 0):
|
||||
"""Record token usage from an API call."""
|
||||
self._prompt_tokens += prompt_tokens
|
||||
self._completion_tokens += completion_tokens
|
||||
self._tool_output_tokens += tool_output_tokens
|
||||
self._history.append({
|
||||
"time": time.time(),
|
||||
"prompt": prompt_tokens,
|
||||
"completion": completion_tokens,
|
||||
"tool_output": tool_output_tokens,
|
||||
"total_used": self.used_tokens,
|
||||
})
|
||||
|
||||
def check(self) -> BudgetStatus:
|
||||
"""Check current budget status and return appropriate actions."""
|
||||
pct = self.percentage
|
||||
|
||||
if pct >= self._thresholds.get(BudgetLevel.TERMINATE, 0.95):
|
||||
level = BudgetLevel.TERMINATE
|
||||
msg = f"Context {pct:.0%} full. Session must terminate with summary."
|
||||
actions = ["generate_summary", "terminate_session"]
|
||||
elif pct >= self._thresholds.get(BudgetLevel.BLOCK, 0.90):
|
||||
level = BudgetLevel.BLOCK
|
||||
msg = f"Context {pct:.0%} full. Blocking verbose tool calls."
|
||||
actions = ["block_verbose_tools", "force_wrap_up", "suggest_summary"]
|
||||
elif pct >= self._thresholds.get(BudgetLevel.COMPRESS, 0.80):
|
||||
level = BudgetLevel.COMPRESS
|
||||
msg = f"Context {pct:.0%} full. Auto-compressing conversation."
|
||||
actions = ["auto_compress", "drop_raw_tool_outputs", "suggest_summary"]
|
||||
elif pct >= self._thresholds.get(BudgetLevel.WARNING, 0.60):
|
||||
level = BudgetLevel.WARNING
|
||||
msg = f"Context {pct:.0%} used. Consider summarizing."
|
||||
actions = ["suggest_summary", "log_warning"]
|
||||
else:
|
||||
level = BudgetLevel.OK
|
||||
msg = f"Context OK: {self.used_tokens}/{self._max_tokens} tokens ({pct:.0%})"
|
||||
actions = []
|
||||
|
||||
return BudgetStatus(
|
||||
level=level,
|
||||
used_tokens=self.used_tokens,
|
||||
max_tokens=self._max_tokens,
|
||||
percentage=round(pct, 3),
|
||||
remaining=self.remaining,
|
||||
message=msg,
|
||||
actions=actions,
|
||||
)
|
||||
|
||||
def should_truncate_tool_output(self, estimated_tokens: int) -> bool:
|
||||
"""Check if a tool output should be truncated."""
|
||||
if self.used_tokens + estimated_tokens > self._max_tokens * 0.95:
|
||||
return True
|
||||
return False
|
||||
|
||||
def get_truncation_budget(self) -> int:
|
||||
"""Get max tokens available for next tool output."""
|
||||
budget = self.remaining - int(self._max_tokens * 0.05) # Reserve 5%
|
||||
return max(0, budget)
|
||||
|
||||
def reset(self):
|
||||
"""Reset budget for new session."""
|
||||
self._prompt_tokens = 0
|
||||
self._completion_tokens = 0
|
||||
self._tool_output_tokens = 0
|
||||
self._history.clear()
|
||||
|
||||
def get_report(self) -> Dict[str, Any]:
|
||||
"""Generate usage report."""
|
||||
status = self.check()
|
||||
return {
|
||||
"status": status.level.value,
|
||||
"used_tokens": self.used_tokens,
|
||||
"max_tokens": self._max_tokens,
|
||||
"remaining": self.remaining,
|
||||
"percentage": status.percentage,
|
||||
"prompt_tokens": self._prompt_tokens,
|
||||
"completion_tokens": self._completion_tokens,
|
||||
"tool_output_tokens": self._tool_output_tokens,
|
||||
"message": status.message,
|
||||
"actions": status.actions,
|
||||
}
|
||||
147
scripts/queue_health_check.py
Executable file
147
scripts/queue_health_check.py
Executable file
@@ -0,0 +1,147 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Queue Health Check — Verify dispatch queue is operational.
|
||||
|
||||
Checks:
|
||||
1. Queue file exists and is readable
|
||||
2. Queue has pending items
|
||||
3. Queue is not stuck (items processing)
|
||||
4. Queue age (stale items)
|
||||
|
||||
Usage:
|
||||
python scripts/queue_health_check.py
|
||||
python scripts/queue_health_check.py --json
|
||||
"""
|
||||
|
||||
import json
|
||||
import sys
|
||||
from datetime import datetime, timedelta
|
||||
from pathlib import Path
|
||||
|
||||
|
||||
def check_queue_health(queue_path: str = "~/.hermes/queue.json") -> dict:
|
||||
"""Check queue health status."""
|
||||
path = Path(queue_path).expanduser()
|
||||
|
||||
result = {
|
||||
"healthy": True,
|
||||
"checks": {},
|
||||
"warnings": [],
|
||||
"errors": []
|
||||
}
|
||||
|
||||
# Check 1: File exists
|
||||
if not path.exists():
|
||||
result["healthy"] = False
|
||||
result["errors"].append(f"Queue file not found: {path}")
|
||||
result["checks"]["file_exists"] = False
|
||||
return result
|
||||
|
||||
result["checks"]["file_exists"] = True
|
||||
|
||||
# Check 2: File is readable
|
||||
try:
|
||||
with open(path, "r") as f:
|
||||
data = json.load(f)
|
||||
except Exception as e:
|
||||
result["healthy"] = False
|
||||
result["errors"].append(f"Cannot read queue: {e}")
|
||||
result["checks"]["readable"] = False
|
||||
return result
|
||||
|
||||
result["checks"]["readable"] = True
|
||||
|
||||
# Check 3: Queue structure
|
||||
if not isinstance(data, dict):
|
||||
result["healthy"] = False
|
||||
result["errors"].append("Queue is not a dict")
|
||||
result["checks"]["valid_structure"] = False
|
||||
return result
|
||||
|
||||
result["checks"]["valid_structure"] = True
|
||||
|
||||
# Check 4: Pending items
|
||||
pending = data.get("pending", [])
|
||||
processing = data.get("processing", [])
|
||||
completed = data.get("completed", [])
|
||||
|
||||
result["checks"]["pending_count"] = len(pending)
|
||||
result["checks"]["processing_count"] = len(processing)
|
||||
result["checks"]["completed_count"] = len(completed)
|
||||
|
||||
if len(pending) == 0 and len(processing) == 0:
|
||||
result["warnings"].append("Queue is empty")
|
||||
|
||||
# Check 5: Stale processing items
|
||||
now = datetime.now()
|
||||
stale_threshold = timedelta(hours=1)
|
||||
|
||||
for item in processing:
|
||||
started = item.get("started_at")
|
||||
if started:
|
||||
try:
|
||||
started_time = datetime.fromisoformat(started.replace("Z", "+00:00"))
|
||||
if now - started_time > stale_threshold:
|
||||
result["warnings"].append(f"Stale item: {item.get('id', 'unknown')} (started {started})")
|
||||
except:
|
||||
pass
|
||||
|
||||
# Check 6: Queue age
|
||||
if pending:
|
||||
oldest = min(pending, key=lambda x: x.get("added_at", ""))
|
||||
added = oldest.get("added_at")
|
||||
if added:
|
||||
try:
|
||||
added_time = datetime.fromisoformat(added.replace("Z", "+00:00"))
|
||||
age = now - added_time
|
||||
if age > timedelta(hours=24):
|
||||
result["warnings"].append(f"Old item in queue: {oldest.get('id', 'unknown')} (added {added})")
|
||||
except:
|
||||
pass
|
||||
|
||||
return result
|
||||
|
||||
|
||||
def main():
|
||||
"""Main function."""
|
||||
import argparse
|
||||
|
||||
parser = argparse.ArgumentParser(description="Queue health check")
|
||||
parser.add_argument("--queue", default="~/.hermes/queue.json", help="Queue file path")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
args = parser.parse_args()
|
||||
|
||||
result = check_queue_health(args.queue)
|
||||
|
||||
if args.json:
|
||||
print(json.dumps(result, indent=2))
|
||||
else:
|
||||
print("Queue Health Check")
|
||||
print("=" * 50)
|
||||
print(f"Healthy: {'✓' if result['healthy'] else '✗'}")
|
||||
print()
|
||||
|
||||
print("Checks:")
|
||||
for check, value in result["checks"].items():
|
||||
if isinstance(value, bool):
|
||||
print(f" {check}: {'✓' if value else '✗'}")
|
||||
else:
|
||||
print(f" {check}: {value}")
|
||||
|
||||
if result["warnings"]:
|
||||
print()
|
||||
print("Warnings:")
|
||||
for warning in result["warnings"]:
|
||||
print(f" ⚠ {warning}")
|
||||
|
||||
if result["errors"]:
|
||||
print()
|
||||
print("Errors:")
|
||||
for error in result["errors"]:
|
||||
print(f" ✗ {error}")
|
||||
|
||||
sys.exit(0 if result["healthy"] else 1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_error(
|
||||
message: str,
|
||||
skill_name: str = None,
|
||||
file_path: str = None,
|
||||
suggestion: str = None,
|
||||
context: dict = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Format an error with rich context for better debugging."""
|
||||
parts = [message]
|
||||
if skill_name:
|
||||
parts.append(f"Skill: {skill_name}")
|
||||
if file_path:
|
||||
parts.append(f"File: {file_path}")
|
||||
if suggestion:
|
||||
parts.append(f"Suggestion: {suggestion}")
|
||||
if context:
|
||||
for key, value in context.items():
|
||||
parts.append(f"{key}: {value}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": " | ".join(parts),
|
||||
"skill_name": skill_name,
|
||||
"file_path": file_path,
|
||||
"suggestion": suggestion,
|
||||
}
|
||||
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
|
||||
Reference in New Issue
Block a user