Compare commits

..

2 Commits

Author SHA1 Message Date
c298834b45 test: Add approval tier tests (#670)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 49s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 51s
Tests / e2e (pull_request) Successful in 4m46s
Tests / test (pull_request) Failing after 51m26s
2026-04-15 04:05:26 +00:00
c19c51a124 feat: Add approval tier system (#670) 2026-04-15 04:05:02 +00:00
5 changed files with 0 additions and 480 deletions

View File

@@ -15,10 +15,6 @@ from typing import Any, Dict, Optional
logger = logging.getLogger(__name__)
_skill_commands: Dict[str, Dict[str, Any]] = {}
# Auto-refresh state: track skills directory modification times
_skill_dirs_mtime: Dict[str, float] = {}
_skill_last_scan_time: float = 0.0
_skill_refresh_interval: float = 300.0 # seconds between refresh checks
_PLAN_SLUG_RE = re.compile(r"[^a-z0-9]+")
# Patterns for sanitizing skill names into clean hyphen-separated slugs.
_SKILL_INVALID_CHARS = re.compile(r"[^a-z0-9-]")
@@ -273,94 +269,6 @@ def get_skill_commands() -> Dict[str, Dict[str, Any]]:
return _skill_commands
def refresh_skill_commands(force: bool = False) -> Dict[str, Dict[str, Any]]:
"""Re-scan skills directories if any have changed since last scan.
Call this periodically (e.g. every N turns) to pick up new skills
installed by the timmy-config sidecar without requiring a restart.
Args:
force: If True, always re-scan regardless of modification times.
Returns:
Updated skill commands mapping.
"""
import time
global _skill_dirs_mtime, _skill_last_scan_time
now = time.time()
# Throttle: don't re-scan more often than every N seconds
if not force and (now - _skill_last_scan_time) < _skill_refresh_interval:
return _skill_commands
try:
from tools.skills_tool import SKILLS_DIR
from agent.skill_utils import get_external_skills_dirs
dirs_to_check = []
if SKILLS_DIR.exists():
dirs_to_check.append(SKILLS_DIR)
dirs_to_check.extend(get_external_skills_dirs())
# Check if any directory has changed
changed = force
current_mtimes: Dict[str, float] = {}
for d in dirs_to_check:
try:
# Get the latest mtime of any SKILL.md in the directory
latest = 0.0
for skill_md in d.rglob("SKILL.md"):
try:
mtime = skill_md.stat().st_mtime
if mtime > latest:
latest = mtime
except OSError:
pass
current_mtimes[str(d)] = latest
old_mtime = _skill_dirs_mtime.get(str(d), 0.0)
if latest > old_mtime:
changed = True
except OSError:
pass
if changed:
_skill_dirs_mtime = current_mtimes
_skill_last_scan_time = now
old_count = len(_skill_commands)
scan_skill_commands()
new_count = len(_skill_commands)
if new_count != old_count:
logger.info(
"Skill refresh: %d skills (was %d, delta: %s%d)",
new_count, old_count,
"+" if new_count > old_count else "",
new_count - old_count,
)
return _skill_commands
_skill_last_scan_time = now
except Exception as e:
logger.debug("Skill refresh check failed: %s", e)
return _skill_commands
def should_refresh_skills(turn_count: int, interval: int = 5) -> bool:
"""Check if skills should be refreshed this turn.
Args:
turn_count: Current conversation turn number.
interval: Refresh every N turns.
Returns:
True if refresh should happen this turn.
"""
return turn_count > 0 and turn_count % interval == 0
def resolve_skill_command_key(command: str) -> Optional[str]:
"""Resolve a user-typed /command to its canonical skill_cmds key.

View File

@@ -7862,15 +7862,6 @@ class AIAgent:
# Track user turns for memory flush and periodic nudge logic
self._user_turn_count += 1
# Auto-refresh skills from sidecar every 5 turns
# Picks up new skills installed by timmy-config without restart
try:
from agent.skill_commands import should_refresh_skills, refresh_skill_commands
if should_refresh_skills(self._user_turn_count, interval=5):
refresh_skill_commands()
except Exception:
pass # non-critical — skill refresh is best-effort
# Preserve the original user message (no nudge injection).
original_user_message = persist_user_message if persist_user_message is not None else user_message

View File

@@ -1,55 +0,0 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,91 +0,0 @@
"""Tests for skill auto-loading from timmy-config sidecar — issue #742."""
import os
import time
import tempfile
from pathlib import Path
import pytest
class TestSkillRefresh:
"""Test the refresh_skill_commands function."""
def test_refresh_returns_dict(self):
from agent.skill_commands import refresh_skill_commands
result = refresh_skill_commands(force=True)
assert isinstance(result, dict)
def test_refresh_is_idempotent(self):
"""Multiple calls with no changes should return same results."""
from agent.skill_commands import refresh_skill_commands
first = refresh_skill_commands(force=True)
second = refresh_skill_commands(force=True)
assert set(first.keys()) == set(second.keys())
def test_should_refresh_skills_interval(self):
from agent.skill_commands import should_refresh_skills
# Turn 0: never refresh
assert not should_refresh_skills(0, interval=5)
# Turn 5: refresh
assert should_refresh_skills(5, interval=5)
# Turn 3: not yet
assert not should_refresh_skills(3, interval=5)
# Turn 10: refresh
assert should_refresh_skills(10, interval=5)
# Turn 7: not yet
assert not should_refresh_skills(7, interval=5)
def test_refresh_picks_up_new_skill(self, tmp_path):
"""New SKILL.md in skills dir should appear after refresh."""
from agent.skill_commands import refresh_skill_commands
import agent.skill_commands as sc
# Create a fake skill
skill_dir = tmp_path / "test-auto-skill"
skill_dir.mkdir()
(skill_dir / "SKILL.md").write_text("""---
name: test-auto-skill
description: A test skill for auto-loading
---
# Test Skill
This is a test.
""")
# Patch SKILLS_DIR to point to tmp_path
from unittest.mock import patch
with patch("tools.skills_tool.SKILLS_DIR", tmp_path):
# Force a scan
sc._skill_commands = {}
sc._skill_dirs_mtime = {}
sc._skill_last_scan_time = 0.0
result = refresh_skill_commands(force=True)
# The skill should appear
assert "/test-auto-skill" in result
assert result["/test-auto-skill"]["name"] == "test-auto-skill"
class TestSkillRefreshThrottling:
"""Test that refresh doesn't re-scan too frequently."""
def test_throttle_blocks_rapid_refresh(self):
from agent.skill_commands import refresh_skill_commands
import agent.skill_commands as sc
sc._skill_last_scan_time = time.time() # just scanned
sc._skill_refresh_interval = 300.0
# Non-forced refresh should be skipped
result = refresh_skill_commands(force=False)
assert result is sc._skill_commands # returns cached, doesn't re-scan
def test_force_bypasses_throttle(self):
from agent.skill_commands import refresh_skill_commands
import agent.skill_commands as sc
sc._skill_last_scan_time = time.time() # just scanned
# Forced refresh should still work
result = refresh_skill_commands(force=True)
assert isinstance(result, dict)

View File

@@ -1,233 +0,0 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"