Compare commits

..

2 Commits

Author SHA1 Message Date
9288ae8be9 test(#745): Add tests for cost estimator
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 42s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 47s
Tests / e2e (pull_request) Successful in 4m31s
Tests / test (pull_request) Failing after 53m47s
Tests for cost estimation, pricing lookup.
Refs #745
2026-04-15 03:13:15 +00:00
f86233cd52 feat(#745): Add provider cost estimator tool
Cost estimation tool with:
- estimate_cost(input_tokens, output_tokens, provider, model)
- Pricing table for OpenRouter, Nous, Anthropic, local (free)
- Session cost estimation
- Cost report formatting

Resolves #745
2026-04-15 03:12:33 +00:00
9 changed files with 233 additions and 863 deletions

View File

@@ -15,10 +15,6 @@ from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_skill_commands: Dict[str, Dict[str, Any]] = {}
# Auto-refresh state: track skills directory modification times
_skill_dirs_mtime: Dict[str, float] = {}
_skill_last_scan_time: float = 0.0
_skill_refresh_interval: float = 300.0 # seconds between refresh checks
_PLAN_SLUG_RE = re.compile(r"[^a-z0-9]+")
# Patterns for sanitizing skill names into clean hyphen-separated slugs.
_SKILL_INVALID_CHARS = re.compile(r"[^a-z0-9-]")
@@ -273,94 +269,6 @@ def get_skill_commands() -> Dict[str, Dict[str, Any]]:
return _skill_commands
def refresh_skill_commands(force: bool = False) -> Dict[str, Dict[str, Any]]:
"""Re-scan skills directories if any have changed since last scan.
Call this periodically (e.g. every N turns) to pick up new skills
installed by the timmy-config sidecar without requiring a restart.
Args:
force: If True, always re-scan regardless of modification times.
Returns:
Updated skill commands mapping.
"""
import time
global _skill_dirs_mtime, _skill_last_scan_time
now = time.time()
# Throttle: don't re-scan more often than every N seconds
if not force and (now - _skill_last_scan_time) < _skill_refresh_interval:
return _skill_commands
try:
from tools.skills_tool import SKILLS_DIR
from agent.skill_utils import get_external_skills_dirs
dirs_to_check = []
if SKILLS_DIR.exists():
dirs_to_check.append(SKILLS_DIR)
dirs_to_check.extend(get_external_skills_dirs())
# Check if any directory has changed
changed = force
current_mtimes: Dict[str, float] = {}
for d in dirs_to_check:
try:
# Get the latest mtime of any SKILL.md in the directory
latest = 0.0
for skill_md in d.rglob("SKILL.md"):
try:
mtime = skill_md.stat().st_mtime
if mtime > latest:
latest = mtime
except OSError:
pass
current_mtimes[str(d)] = latest
old_mtime = _skill_dirs_mtime.get(str(d), 0.0)
if latest > old_mtime:
changed = True
except OSError:
pass
if changed:
_skill_dirs_mtime = current_mtimes
_skill_last_scan_time = now
old_count = len(_skill_commands)
scan_skill_commands()
new_count = len(_skill_commands)
if new_count != old_count:
logger.info(
"Skill refresh: %d skills (was %d, delta: %s%d)",
new_count, old_count,
"+" if new_count > old_count else "",
new_count - old_count,
)
return _skill_commands
_skill_last_scan_time = now
except Exception as e:
logger.debug("Skill refresh check failed: %s", e)
return _skill_commands
def should_refresh_skills(turn_count: int, interval: int = 5) -> bool:
"""Check if skills should be refreshed this turn.
Args:
turn_count: Current conversation turn number.
interval: Refresh every N turns.
Returns:
True if refresh should happen this turn.
"""
return turn_count > 0 and turn_count % interval == 0
def resolve_skill_command_key(command: str) -> Optional[str]:
"""Resolve a user-typed /command to its canonical skill_cmds key.

View File

@@ -7862,15 +7862,6 @@ class AIAgent:
# Track user turns for memory flush and periodic nudge logic
self._user_turn_count += 1
# Auto-refresh skills from sidecar every 5 turns
# Picks up new skills installed by timmy-config without restart
try:
from agent.skill_commands import should_refresh_skills, refresh_skill_commands
if should_refresh_skills(self._user_turn_count, interval=5):
refresh_skill_commands()
except Exception:
pass # non-critical — skill refresh is best-effort
# Preserve the original user message (no nudge injection).
original_user_message = persist_user_message if persist_user_message is not None else user_message

View File

@@ -1,122 +0,0 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -0,0 +1,41 @@
"""
Tests for cost estimator tool (#745).
"""
import pytest
from tools.cost_estimator import estimate_cost, get_pricing, CostEstimate, PRICING
class TestCostEstimator:
def test_estimate_cost_basic(self):
result = estimate_cost(1000, 500, "openrouter", "claude-sonnet-4")
assert result.input_tokens == 1000
assert result.output_tokens == 500
assert result.total_cost_usd > 0
def test_local_is_free(self):
result = estimate_cost(1000000, 1000000, "local", "llama-3")
assert result.total_cost_usd == 0.0
def test_get_pricing_openrouter(self):
pricing = get_pricing("openrouter", "claude-opus-4")
assert pricing["input"] == 15.0
assert pricing["output"] == 75.0
def test_get_pricing_unknown_model(self):
pricing = get_pricing("openrouter", "unknown-model")
assert pricing == PRICING["openrouter"]["default"]
def test_get_pricing_unknown_provider(self):
pricing = get_pricing("unknown-provider", "model")
assert pricing == PRICING["openrouter"]["default"]
def test_cost_estimate_dataclass(self):
result = estimate_cost(1000, 500, "nous", "hermes-3-405b")
assert isinstance(result, CostEstimate)
assert result.provider == "nous"
assert result.model == "hermes-3-405b"
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,91 +0,0 @@
"""Tests for skill auto-loading from timmy-config sidecar — issue #742."""
import os
import time
import tempfile
from pathlib import Path
import pytest
class TestSkillRefresh:
"""Test the refresh_skill_commands function."""
def test_refresh_returns_dict(self):
from agent.skill_commands import refresh_skill_commands
result = refresh_skill_commands(force=True)
assert isinstance(result, dict)
def test_refresh_is_idempotent(self):
"""Multiple calls with no changes should return same results."""
from agent.skill_commands import refresh_skill_commands
first = refresh_skill_commands(force=True)
second = refresh_skill_commands(force=True)
assert set(first.keys()) == set(second.keys())
def test_should_refresh_skills_interval(self):
from agent.skill_commands import should_refresh_skills
# Turn 0: never refresh
assert not should_refresh_skills(0, interval=5)
# Turn 5: refresh
assert should_refresh_skills(5, interval=5)
# Turn 3: not yet
assert not should_refresh_skills(3, interval=5)
# Turn 10: refresh
assert should_refresh_skills(10, interval=5)
# Turn 7: not yet
assert not should_refresh_skills(7, interval=5)
def test_refresh_picks_up_new_skill(self, tmp_path):
"""New SKILL.md in skills dir should appear after refresh."""
from agent.skill_commands import refresh_skill_commands
import agent.skill_commands as sc
# Create a fake skill
skill_dir = tmp_path / "test-auto-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("""---
name: test-auto-skill
description: A test skill for auto-loading
---
# Test Skill
This is a test.
""")
# Patch SKILLS_DIR to point to tmp_path
from unittest.mock import patch
with patch("tools.skills_tool.SKILLS_DIR", tmp_path):
# Force a scan
sc._skill_commands = {}
sc._skill_dirs_mtime = {}
sc._skill_last_scan_time = 0.0
result = refresh_skill_commands(force=True)
# The skill should appear
assert "/test-auto-skill" in result
assert result["/test-auto-skill"]["name"] == "test-auto-skill"
class TestSkillRefreshThrottling:
"""Test that refresh doesn't re-scan too frequently."""
def test_throttle_blocks_rapid_refresh(self):
from agent.skill_commands import refresh_skill_commands
import agent.skill_commands as sc
sc._skill_last_scan_time = time.time() # just scanned
sc._skill_refresh_interval = 300.0
# Non-forced refresh should be skipped
result = refresh_skill_commands(force=False)
assert result is sc._skill_commands # returns cached, doesn't re-scan
def test_force_bypasses_throttle(self):
from agent.skill_commands import refresh_skill_commands
import agent.skill_commands as sc
sc._skill_last_scan_time = time.time() # just scanned
# Forced refresh should still work
result = refresh_skill_commands(force=True)
assert isinstance(result, dict)

View File

@@ -1,261 +0,0 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)

192
tools/cost_estimator.py Normal file
View File

@@ -0,0 +1,192 @@
"""
Provider Cost Estimator — Estimate API costs from token counts.
Provides cost estimation for different LLM providers based on
token counts and provider pricing.
"""
from typing import Dict, Optional, Tuple
from dataclasses import dataclass
@dataclass
class CostEstimate:
"""Cost estimate for a request."""
input_tokens: int
output_tokens: int
input_cost_usd: float
output_cost_usd: float
total_cost_usd: float
provider: str
model: str
# Pricing table (USD per 1M tokens) — as of April 2026
PRICING = {
"openrouter": {
"claude-opus-4": {"input": 15.0, "output": 75.0},
"claude-sonnet-4": {"input": 3.0, "output": 15.0},
"claude-haiku-3.5": {"input": 0.80, "output": 4.0},
"gpt-4o": {"input": 2.50, "output": 10.0},
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gemini-2.5-pro": {"input": 1.25, "output": 10.0},
"gemini-2.5-flash": {"input": 0.15, "output": 0.60},
"llama-4-scout": {"input": 0.20, "output": 0.80},
"llama-4-maverick": {"input": 0.50, "output": 2.0},
"default": {"input": 1.0, "output": 3.0},
},
"nous": {
"hermes-3-405b": {"input": 5.0, "output": 5.0},
"mixtral-8x22b": {"input": 2.0, "output": 2.0},
"hermes-2-mixtral-8x7b": {"input": 0.90, "output": 0.90},
"default": {"input": 2.0, "output": 2.0},
},
"anthropic": {
"claude-opus-4": {"input": 15.0, "output": 75.0},
"claude-sonnet-4": {"input": 3.0, "output": 15.0},
"claude-haiku-3.5": {"input": 0.80, "output": 4.0},
"default": {"input": 3.0, "output": 15.0},
},
"local": {
# Local models are free (electricity only)
"default": {"input": 0.0, "output": 0.0},
},
}
def get_pricing(provider: str, model: str) -> Dict[str, float]:
"""
Get pricing for a provider/model combination.
Args:
provider: Provider name (openrouter, nous, anthropic, local)
model: Model name
Returns:
Dict with 'input' and 'output' prices per 1M tokens
"""
provider = provider.lower().strip()
model = model.lower().strip()
provider_pricing = PRICING.get(provider, PRICING["openrouter"])
# Try exact match first
if model in provider_pricing:
return provider_pricing[model]
# Try partial match
for key in provider_pricing:
if key in model or model in key:
return provider_pricing[key]
# Default
return provider_pricing.get("default", {"input": 1.0, "output": 3.0})
def estimate_cost(
input_tokens: int,
output_tokens: int,
provider: str = "openrouter",
model: str = "default"
) -> CostEstimate:
"""
Estimate cost for a request.
Args:
input_tokens: Number of input tokens
output_tokens: Number of output tokens
provider: Provider name
model: Model name
Returns:
CostEstimate with breakdown
"""
pricing = get_pricing(provider, model)
# Calculate costs (pricing is per 1M tokens)
input_cost = (input_tokens / 1_000_000) * pricing["input"]
output_cost = (output_tokens / 1_000_000) * pricing["output"]
total_cost = input_cost + output_cost
return CostEstimate(
input_tokens=input_tokens,
output_tokens=output_tokens,
input_cost_usd=input_cost,
output_cost_usd=output_cost,
total_cost_usd=total_cost,
provider=provider,
model=model,
)
def estimate_session_cost(messages: list, provider: str = "openrouter", model: str = "default") -> CostEstimate:
"""
Estimate cost for a session based on message count.
Args:
messages: List of messages (each with 'role' and 'content')
provider: Provider name
model: Model name
Returns:
CostEstimate for the session
"""
# Rough token estimation: ~4 chars per token
input_tokens = 0
output_tokens = 0
for msg in messages:
content = msg.get("content", "")
if isinstance(content, str):
tokens = len(content) // 4
if msg.get("role") == "user":
input_tokens += tokens
elif msg.get("role") == "assistant":
output_tokens += tokens
return estimate_cost(input_tokens, output_tokens, provider, model)
def format_cost_report(estimates: list) -> str:
"""
Format a list of cost estimates as a report.
Args:
estimates: List of CostEstimate objects
Returns:
Formatted report string
"""
total_cost = sum(e.total_cost_usd for e in estimates)
total_input = sum(e.input_tokens for e in estimates)
total_output = sum(e.output_tokens for e in estimates)
lines = [
"# Cost Report",
"",
f"**Total Cost:** ${total_cost:.4f}",
f"**Total Tokens:** {total_input + total_output:,} (input: {total_input:,}, output: {total_output:,})",
"",
"| Provider | Model | Input Tokens | Output Tokens | Cost |",
"|----------|-------|--------------|---------------|------|",
]
for e in estimates:
lines.append(f"| {e.provider} | {e.model} | {e.input_tokens:,} | {e.output_tokens:,} | ${e.total_cost_usd:.4f} |")
lines.append("")
lines.append(f"*Generated by cost_estimator.py*")
return "\n".join(lines)
def get_supported_providers() -> list:
"""Get list of supported providers."""
return list(PRICING.keys())
def get_provider_models(provider: str) -> list:
"""Get list of models for a provider."""
provider = provider.lower().strip()
provider_pricing = PRICING.get(provider, {})
return [k for k in provider_pricing.keys() if k != "default"]

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"