Compare commits

..

2 Commits

Author SHA1 Message Date
c298834b45 test: Add approval tier tests (#670)
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 49s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 51s
Tests / e2e (pull_request) Successful in 4m46s
Tests / test (pull_request) Failing after 51m26s
2026-04-15 04:05:26 +00:00
c19c51a124 feat: Add approval tier system (#670) 2026-04-15 04:05:02 +00:00
4 changed files with 383 additions and 266 deletions

View File

@@ -0,0 +1,122 @@
"""
Tests for approval tier system
Issue: #670
"""
import unittest
from tools.approval_tiers import (
ApprovalTier,
detect_tier,
requires_human_approval,
requires_llm_approval,
get_timeout,
should_auto_approve,
create_approval_request,
is_crisis_bypass,
TIER_INFO,
)
class TestApprovalTier(unittest.TestCase):
def test_tier_values(self):
self.assertEqual(ApprovalTier.SAFE, 0)
self.assertEqual(ApprovalTier.LOW, 1)
self.assertEqual(ApprovalTier.MEDIUM, 2)
self.assertEqual(ApprovalTier.HIGH, 3)
self.assertEqual(ApprovalTier.CRITICAL, 4)
class TestTierDetection(unittest.TestCase):
def test_safe_actions(self):
self.assertEqual(detect_tier("read_file"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("web_search"), ApprovalTier.SAFE)
self.assertEqual(detect_tier("session_search"), ApprovalTier.SAFE)
def test_low_actions(self):
self.assertEqual(detect_tier("write_file"), ApprovalTier.LOW)
self.assertEqual(detect_tier("terminal"), ApprovalTier.LOW)
self.assertEqual(detect_tier("execute_code"), ApprovalTier.LOW)
def test_medium_actions(self):
self.assertEqual(detect_tier("send_message"), ApprovalTier.MEDIUM)
self.assertEqual(detect_tier("git_push"), ApprovalTier.MEDIUM)
def test_high_actions(self):
self.assertEqual(detect_tier("config_change"), ApprovalTier.HIGH)
self.assertEqual(detect_tier("key_rotation"), ApprovalTier.HIGH)
def test_critical_actions(self):
self.assertEqual(detect_tier("kill_process"), ApprovalTier.CRITICAL)
self.assertEqual(detect_tier("shutdown"), ApprovalTier.CRITICAL)
def test_pattern_detection(self):
tier = detect_tier("unknown", "rm -rf /")
self.assertEqual(tier, ApprovalTier.CRITICAL)
tier = detect_tier("unknown", "sudo apt install")
self.assertEqual(tier, ApprovalTier.MEDIUM)
class TestTierInfo(unittest.TestCase):
def test_safe_no_approval(self):
self.assertFalse(requires_human_approval(ApprovalTier.SAFE))
self.assertFalse(requires_llm_approval(ApprovalTier.SAFE))
self.assertIsNone(get_timeout(ApprovalTier.SAFE))
def test_medium_requires_both(self):
self.assertTrue(requires_human_approval(ApprovalTier.MEDIUM))
self.assertTrue(requires_llm_approval(ApprovalTier.MEDIUM))
self.assertEqual(get_timeout(ApprovalTier.MEDIUM), 60)
def test_critical_fast_timeout(self):
self.assertEqual(get_timeout(ApprovalTier.CRITICAL), 10)
class TestAutoApprove(unittest.TestCase):
def test_safe_auto_approves(self):
self.assertTrue(should_auto_approve("read_file"))
self.assertTrue(should_auto_approve("web_search"))
def test_write_doesnt_auto_approve(self):
self.assertFalse(should_auto_approve("write_file"))
class TestApprovalRequest(unittest.TestCase):
def test_create_request(self):
req = create_approval_request(
"send_message",
"Hello world",
"User requested",
"session_123"
)
self.assertEqual(req.tier, ApprovalTier.MEDIUM)
self.assertEqual(req.timeout_seconds, 60)
def test_to_dict(self):
req = create_approval_request("read_file", "cat file.txt", "test", "s1")
d = req.to_dict()
self.assertEqual(d["tier"], 0)
self.assertEqual(d["tier_name"], "Safe")
class TestCrisisBypass(unittest.TestCase):
def test_send_message_bypass(self):
self.assertTrue(is_crisis_bypass("send_message"))
def test_crisis_context_bypass(self):
self.assertTrue(is_crisis_bypass("unknown", "call 988 lifeline"))
self.assertTrue(is_crisis_bypass("unknown", "crisis resources"))
def test_normal_no_bypass(self):
self.assertFalse(is_crisis_bypass("read_file"))
if __name__ == "__main__":
unittest.main()

View File

@@ -1,81 +0,0 @@
"""
Tests for skill dependency resolver
Issue: #754
"""
import unittest
from unittest.mock import patch, MagicMock
from tools.skill_deps import (
parse_requires,
check_dependency,
check_dependencies,
resolve_dependencies,
)
class TestParseRequires(unittest.TestCase):
def test_list(self):
fm = {"requires": ["pkg1", "pkg2"]}
self.assertEqual(parse_requires(fm), ["pkg1", "pkg2"])
def test_string(self):
fm = {"requires": "pkg1"}
self.assertEqual(parse_requires(fm), ["pkg1"])
def test_empty(self):
fm = {}
self.assertEqual(parse_requires(fm), [])
def test_none(self):
fm = {"requires": None}
self.assertEqual(parse_requires(fm), [])
class TestCheckDependency(unittest.TestCase):
def test_installed(self):
installed, version = check_dependency("json")
self.assertTrue(installed)
def test_not_installed(self):
installed, version = check_dependency("nonexistent_package_xyz_123")
self.assertFalse(installed)
class TestCheckDependencies(unittest.TestCase):
def test_all_installed(self):
installed, missing = check_dependencies(["json", "os"])
self.assertEqual(len(missing), 0)
def test_some_missing(self):
installed, missing = check_dependencies(["json", "nonexistent_xyz"])
self.assertIn("json", installed)
self.assertIn("nonexistent_xyz", missing)
class TestResolveDependencies(unittest.TestCase):
def test_no_requires(self):
satisfied, installed, missing = resolve_dependencies({})
self.assertTrue(satisfied)
def test_all_satisfied(self):
satisfied, installed, missing = resolve_dependencies(
{"requires": ["json"]}, auto_install=False
)
self.assertTrue(satisfied)
def test_missing_no_auto(self):
satisfied, installed, missing = resolve_dependencies(
{"requires": ["nonexistent_xyz"]}, auto_install=False
)
self.assertFalse(satisfied)
self.assertIn("nonexistent_xyz", missing)
if __name__ == "__main__":
unittest.main()

261
tools/approval_tiers.py Normal file
View File

@@ -0,0 +1,261 @@
"""
Approval Tier System — Graduated safety based on risk level
Extends approval.py with 5-tier system for command approval.
| Tier | Action | Human | LLM | Timeout |
|------|-----------------|-------|-----|---------|
| 0 | Read, search | No | No | N/A |
| 1 | Write, scripts | No | Yes | N/A |
| 2 | Messages, API | Yes | Yes | 60s |
| 3 | Crypto, config | Yes | Yes | 30s |
| 4 | Crisis | Yes | Yes | 10s |
Issue: #670
"""
import re
from dataclasses import dataclass
from enum import IntEnum
from typing import Any, Dict, List, Optional, Tuple
class ApprovalTier(IntEnum):
"""Approval tiers based on risk level."""
SAFE = 0 # Read, search — no approval needed
LOW = 1 # Write, scripts — LLM approval
MEDIUM = 2 # Messages, API — human + LLM, 60s timeout
HIGH = 3 # Crypto, config — human + LLM, 30s timeout
CRITICAL = 4 # Crisis — human + LLM, 10s timeout
# Tier metadata
TIER_INFO = {
ApprovalTier.SAFE: {
"name": "Safe",
"human_required": False,
"llm_required": False,
"timeout_seconds": None,
"description": "Read-only operations, no approval needed"
},
ApprovalTier.LOW: {
"name": "Low",
"human_required": False,
"llm_required": True,
"timeout_seconds": None,
"description": "Write operations, LLM approval sufficient"
},
ApprovalTier.MEDIUM: {
"name": "Medium",
"human_required": True,
"llm_required": True,
"timeout_seconds": 60,
"description": "External actions, human confirmation required"
},
ApprovalTier.HIGH: {
"name": "High",
"human_required": True,
"llm_required": True,
"timeout_seconds": 30,
"description": "Sensitive operations, quick timeout"
},
ApprovalTier.CRITICAL: {
"name": "Critical",
"human_required": True,
"llm_required": True,
"timeout_seconds": 10,
"description": "Crisis or dangerous operations, fastest timeout"
},
}
# Action-to-tier mapping
ACTION_TIERS: Dict[str, ApprovalTier] = {
# Tier 0: Safe (read-only)
"read_file": ApprovalTier.SAFE,
"search_files": ApprovalTier.SAFE,
"web_search": ApprovalTier.SAFE,
"session_search": ApprovalTier.SAFE,
"list_files": ApprovalTier.SAFE,
"get_file_content": ApprovalTier.SAFE,
"memory_search": ApprovalTier.SAFE,
"skills_list": ApprovalTier.SAFE,
"skills_search": ApprovalTier.SAFE,
# Tier 1: Low (write operations)
"write_file": ApprovalTier.LOW,
"create_file": ApprovalTier.LOW,
"patch_file": ApprovalTier.LOW,
"delete_file": ApprovalTier.LOW,
"execute_code": ApprovalTier.LOW,
"terminal": ApprovalTier.LOW,
"run_script": ApprovalTier.LOW,
"skill_install": ApprovalTier.LOW,
# Tier 2: Medium (external actions)
"send_message": ApprovalTier.MEDIUM,
"web_fetch": ApprovalTier.MEDIUM,
"browser_navigate": ApprovalTier.MEDIUM,
"api_call": ApprovalTier.MEDIUM,
"gitea_create_issue": ApprovalTier.MEDIUM,
"gitea_create_pr": ApprovalTier.MEDIUM,
"git_push": ApprovalTier.MEDIUM,
"deploy": ApprovalTier.MEDIUM,
# Tier 3: High (sensitive operations)
"config_change": ApprovalTier.HIGH,
"env_change": ApprovalTier.HIGH,
"key_rotation": ApprovalTier.HIGH,
"access_grant": ApprovalTier.HIGH,
"permission_change": ApprovalTier.HIGH,
"backup_restore": ApprovalTier.HIGH,
# Tier 4: Critical (crisis/dangerous)
"kill_process": ApprovalTier.CRITICAL,
"rm_rf": ApprovalTier.CRITICAL,
"format_disk": ApprovalTier.CRITICAL,
"shutdown": ApprovalTier.CRITICAL,
"crisis_override": ApprovalTier.CRITICAL,
}
# Dangerous command patterns (from existing approval.py)
_DANGEROUS_PATTERNS = [
(r"rm\s+-rf\s+/", ApprovalTier.CRITICAL),
(r"mkfs\.", ApprovalTier.CRITICAL),
(r"dd\s+if=.*of=/dev/", ApprovalTier.CRITICAL),
(r"shutdown|reboot|halt", ApprovalTier.CRITICAL),
(r"chmod\s+777", ApprovalTier.HIGH),
(r"curl.*\|\s*bash", ApprovalTier.HIGH),
(r"wget.*\|\s*sh", ApprovalTier.HIGH),
(r"eval\s*\(", ApprovalTier.HIGH),
(r"sudo\s+", ApprovalTier.MEDIUM),
(r"git\s+push.*--force", ApprovalTier.HIGH),
(r"docker\s+rm.*-f", ApprovalTier.MEDIUM),
(r"kubectl\s+delete", ApprovalTier.HIGH),
]
@dataclass
class ApprovalRequest:
"""A request for approval."""
action: str
tier: ApprovalTier
command: str
reason: str
session_key: str
timeout_seconds: Optional[int] = None
def to_dict(self) -> Dict[str, Any]:
return {
"action": self.action,
"tier": self.tier.value,
"tier_name": TIER_INFO[self.tier]["name"],
"command": self.command,
"reason": self.reason,
"session_key": self.session_key,
"timeout": self.timeout_seconds,
"human_required": TIER_INFO[self.tier]["human_required"],
"llm_required": TIER_INFO[self.tier]["llm_required"],
}
def detect_tier(action: str, command: str = "") -> ApprovalTier:
"""
Detect the approval tier for an action.
Checks action name first, then falls back to pattern matching.
"""
# Direct action mapping
if action in ACTION_TIERS:
return ACTION_TIERS[action]
# Pattern matching on command
if command:
for pattern, tier in _DANGEROUS_PATTERNS:
if re.search(pattern, command, re.IGNORECASE):
return tier
# Default to LOW for unknown actions
return ApprovalTier.LOW
def requires_human_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires human approval."""
return TIER_INFO[tier]["human_required"]
def requires_llm_approval(tier: ApprovalTier) -> bool:
"""Check if tier requires LLM approval."""
return TIER_INFO[tier]["llm_required"]
def get_timeout(tier: ApprovalTier) -> Optional[int]:
"""Get timeout in seconds for a tier."""
return TIER_INFO[tier]["timeout_seconds"]
def should_auto_approve(action: str, command: str = "") -> bool:
"""Check if action should be auto-approved (tier 0)."""
tier = detect_tier(action, command)
return tier == ApprovalTier.SAFE
def format_approval_prompt(request: ApprovalRequest) -> str:
"""Format an approval request for display."""
info = TIER_INFO[request.tier]
lines = []
lines.append(f"⚠️ Approval Required (Tier {request.tier.value}: {info['name']})")
lines.append(f"")
lines.append(f"Action: {request.action}")
lines.append(f"Command: {request.command[:100]}{'...' if len(request.command) > 100 else ''}")
lines.append(f"Reason: {request.reason}")
lines.append(f"")
if info["human_required"]:
lines.append(f"👤 Human approval required")
if info["llm_required"]:
lines.append(f"🤖 LLM approval required")
if info["timeout_seconds"]:
lines.append(f"⏱️ Timeout: {info['timeout_seconds']}s")
return "\n".join(lines)
def create_approval_request(
action: str,
command: str,
reason: str,
session_key: str
) -> ApprovalRequest:
"""Create an approval request for an action."""
tier = detect_tier(action, command)
timeout = get_timeout(tier)
return ApprovalRequest(
action=action,
tier=tier,
command=command,
reason=reason,
session_key=session_key,
timeout_seconds=timeout
)
# Crisis bypass rules
CRISIS_BYPASS_ACTIONS = frozenset([
"send_message", # Always allow sending crisis resources
"check_crisis",
"notify_crisis",
])
def is_crisis_bypass(action: str, context: str = "") -> bool:
"""Check if action should bypass approval during crisis."""
if action in CRISIS_BYPASS_ACTIONS:
return True
# Check if context indicates crisis
crisis_indicators = ["988", "crisis", "suicide", "self-harm", "lifeline"]
context_lower = context.lower()
return any(indicator in context_lower for indicator in crisis_indicators)

View File

@@ -1,185 +0,0 @@
"""
Skill Dependency Resolver — Auto-install missing dependencies
Checks skill frontmatter for `requires` field and ensures
dependencies are installed before loading the skill.
Issue: #754
"""
import importlib
import logging
import subprocess
import sys
from typing import Any, Dict, List, Optional, Tuple
logger = logging.getLogger(__name__)
def parse_requires(frontmatter: Dict[str, Any]) -> List[str]:
"""
Parse the `requires` field from skill frontmatter.
Supports:
- requires: [package1, package2]
- requires: package1
- requires:
- package1
- package2
"""
requires = frontmatter.get("requires", [])
if isinstance(requires, str):
return [requires]
if isinstance(requires, list):
return [str(r) for r in requires if r]
return []
def check_dependency(package: str) -> Tuple[bool, str]:
"""
Check if a Python package is installed.
Returns:
Tuple of (is_installed, version_or_error)
"""
# Handle pip package names (e.g., "matrix-nio[e2e]")
import_name = package.split("[")[0].replace("-", "_")
try:
mod = importlib.import_module(import_name)
version = getattr(mod, "__version__", "installed")
return True, version
except ImportError:
return False, "not installed"
def check_dependencies(requires: List[str]) -> Tuple[List[str], List[str]]:
"""
Check which dependencies are missing.
Returns:
Tuple of (installed, missing)
"""
installed = []
missing = []
for pkg in requires:
is_installed, _ = check_dependency(pkg)
if is_installed:
installed.append(pkg)
else:
missing.append(pkg)
return installed, missing
def install_dependency(package: str, quiet: bool = False) -> Tuple[bool, str]:
"""
Install a Python package via pip.
Returns:
Tuple of (success, output_or_error)
"""
try:
cmd = [sys.executable, "-m", "pip", "install", package]
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=120
)
if result.returncode == 0:
if not quiet:
logger.info("Installed %s", package)
return True, result.stdout
else:
logger.error("Failed to install %s: %s", package, result.stderr)
return False, result.stderr
except subprocess.TimeoutExpired:
return False, "Installation timed out"
except Exception as e:
return False, str(e)
def resolve_dependencies(
frontmatter: Dict[str, Any],
auto_install: bool = False,
quiet: bool = False
) -> Tuple[bool, List[str], List[str]]:
"""
Resolve skill dependencies.
Args:
frontmatter: Skill frontmatter dict
auto_install: If True, install missing deps without asking
quiet: If True, suppress output
Returns:
Tuple of (all_satisfied, installed_now, still_missing)
"""
requires = parse_requires(frontmatter)
if not requires:
return True, [], []
installed, missing = check_dependencies(requires)
if not missing:
if not quiet:
logger.debug("All dependencies satisfied: %s", installed)
return True, [], []
if not auto_install:
if not quiet:
logger.warning("Missing dependencies: %s", missing)
return False, [], missing
# Auto-install missing dependencies
installed_now = []
still_missing = []
for pkg in missing:
if not quiet:
logger.info("Installing missing dependency: %s", pkg)
success, output = install_dependency(pkg, quiet=quiet)
if success:
installed_now.append(pkg)
else:
still_missing.append(pkg)
logger.error("Failed to install %s: %s", pkg, output[:200])
all_satisfied = len(still_missing) == 0
return all_satisfied, installed_now, still_missing
def check_skill_dependencies(skill_dir) -> Dict[str, Any]:
"""
Check dependencies for a skill directory.
Returns:
Dict with dependency status
"""
from pathlib import Path
skill_md = Path(skill_dir) / "SKILL.md"
if not skill_md.exists():
return {"requires": [], "installed": [], "missing": [], "satisfied": True}
try:
content = skill_md.read_text()
from agent.skill_utils import parse_frontmatter
frontmatter, _ = parse_frontmatter(content)
except Exception:
return {"requires": [], "installed": [], "missing": [], "satisfied": True}
requires = parse_requires(frontmatter)
installed, missing = check_dependencies(requires)
return {
"requires": requires,
"installed": installed,
"missing": missing,
"satisfied": len(missing) == 0
}