This repository has been archived on 2026-03-24. You can view files and clone it. You cannot open issues or pull requests or push a commit.
Files
Timmy-time-dashboard/src/timmy/tool_safety.py
2026-03-18 20:36:38 -04:00

273 lines
8.2 KiB
Python

"""Tool safety classification and tool-call extraction helpers.
Classifies tools into tiers based on their potential impact:
- DANGEROUS: Can modify filesystem, execute code, or change system state.
Requires user confirmation before execution.
- SAFE: Read-only or purely computational. Executes without confirmation.
Also provides:
- Allowlist checker: reads config/allowlist.yaml to auto-approve bounded
tool calls when no human is present (autonomous mode).
- Shared helpers for extracting hallucinated tool calls from model output
and formatting them for human review.
"""
import json
import logging
import re
from pathlib import Path
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Tool classification
# ---------------------------------------------------------------------------
# Tools that require confirmation before execution.
DANGEROUS_TOOLS = frozenset(
{
"shell",
"python",
"write_file",
"aider",
"plan_and_execute",
}
)
# Tools that are safe to execute without confirmation.
SAFE_TOOLS = frozenset(
{
"calculator",
"memory_search",
"memory_read",
"memory_write",
"read_file",
"list_files",
"consult_grok",
"get_system_info",
"check_ollama_health",
"get_memory_status",
"list_swarm_agents",
# Artifact tools
"jot_note",
"log_decision",
# MCP Gitea tools
"issue_write",
"issue_read",
"list_issues",
"pull_request_write",
"pull_request_read",
"list_pull_requests",
"list_branches",
"list_commits",
# MCP filesystem tools (read-only)
"list_directory",
"search_files",
"get_file_info",
"directory_tree",
}
)
def requires_confirmation(tool_name: str) -> bool:
"""Check if a tool requires user confirmation before execution.
Unknown tools default to requiring confirmation (safe-by-default).
"""
if tool_name in SAFE_TOOLS:
return False
return True
# ---------------------------------------------------------------------------
# Allowlist — autonomous tool approval
# ---------------------------------------------------------------------------
_ALLOWLIST_PATHS = [
Path(__file__).resolve().parent.parent.parent / "config" / "allowlist.yaml",
Path.home() / "Timmy-Time-dashboard" / "config" / "allowlist.yaml",
]
_allowlist_cache: dict | None = None
def _load_allowlist() -> dict:
"""Load and cache allowlist.yaml. Returns {} if not found."""
global _allowlist_cache
if _allowlist_cache is not None:
return _allowlist_cache
try:
import yaml
except ImportError:
logger.debug("PyYAML not installed — allowlist disabled")
_allowlist_cache = {}
return _allowlist_cache
for path in _ALLOWLIST_PATHS:
if path.is_file():
try:
with open(path) as f:
_allowlist_cache = yaml.safe_load(f) or {}
logger.info("Loaded tool allowlist from %s", path)
return _allowlist_cache
except Exception as exc:
logger.warning("Failed to load allowlist %s: %s", path, exc)
_allowlist_cache = {}
return _allowlist_cache
def reload_allowlist() -> None:
"""Force a reload of the allowlist config (e.g., after editing YAML)."""
global _allowlist_cache
_allowlist_cache = None
_load_allowlist()
def is_allowlisted(tool_name: str, tool_args: dict | None = None) -> bool:
"""Check if a specific tool call is allowlisted for autonomous execution.
Returns True only when the tool call matches an explicit allowlist rule.
Returns False for anything not covered — safe-by-default.
"""
allowlist = _load_allowlist()
if not allowlist:
return False
rule = allowlist.get(tool_name)
if rule is None:
return False
tool_args = tool_args or {}
# Simple auto-approve flag
if rule.get("auto_approve") is True:
return True
# Shell: prefix + deny pattern matching
if tool_name == "shell":
return _check_shell_allowlist(rule, tool_args)
# write_file: path prefix check
if tool_name == "write_file":
return _check_write_file_allowlist(rule, tool_args)
return False
def _check_shell_allowlist(rule: dict, tool_args: dict) -> bool:
"""Check if a shell command matches the allowlist."""
# Extract the command string — Agno ShellTools uses "args" (list or str)
cmd = tool_args.get("command") or tool_args.get("args", "")
if isinstance(cmd, list):
cmd = " ".join(cmd)
cmd = cmd.strip()
if not cmd:
return False
# Check deny patterns first — these always block
deny_patterns = rule.get("deny_patterns", [])
for pattern in deny_patterns:
if pattern in cmd:
logger.warning("Shell command blocked by deny pattern %r: %s", pattern, cmd[:100])
return False
# Check allow prefixes
allow_prefixes = rule.get("allow_prefixes", [])
for prefix in allow_prefixes:
if cmd.startswith(prefix):
logger.info("Shell command auto-approved by prefix %r: %s", prefix, cmd[:100])
return True
return False
def _check_write_file_allowlist(rule: dict, tool_args: dict) -> bool:
"""Check if a write_file target is within allowed paths."""
path_str = tool_args.get("file_name") or tool_args.get("path", "")
if not path_str:
return False
# Resolve ~ to home
if path_str.startswith("~"):
path_str = str(Path(path_str).expanduser())
allowed_prefixes = rule.get("allowed_path_prefixes", [])
for prefix in allowed_prefixes:
# Resolve ~ in the prefix too
if prefix.startswith("~"):
prefix = str(Path(prefix).expanduser())
if path_str.startswith(prefix):
logger.info("write_file auto-approved for path: %s", path_str)
return True
return False
# ---------------------------------------------------------------------------
# Tool call extraction from model output
# ---------------------------------------------------------------------------
_TOOL_CALL_RE = re.compile(
r'\{\s*"name"\s*:\s*"([^"]+?)"\s*,\s*"(?:parameters|arguments)"\s*:\s*(\{.*?\})\s*\}',
re.DOTALL,
)
def extract_tool_calls(text: str) -> list[tuple[str, dict]]:
"""Extract hallucinated tool calls from model output.
Returns list of (tool_name, arguments_dict) tuples.
Handles both ``"arguments"`` and ``"parameters"`` JSON keys.
"""
if not text:
return []
results = []
for match in _TOOL_CALL_RE.finditer(text):
tool_name = match.group(1)
try:
args = json.loads(match.group(2))
except json.JSONDecodeError:
continue
results.append((tool_name, args))
return results
# ---------------------------------------------------------------------------
# Formatting helpers
# ---------------------------------------------------------------------------
def format_action_description(tool_name: str, tool_args: dict) -> str:
"""Format a human-readable description of a tool action."""
if tool_name == "shell":
cmd = tool_args.get("command") or tool_args.get("args", "")
if isinstance(cmd, list):
cmd = " ".join(cmd)
return f"Run shell command:\n`{cmd}`"
elif tool_name == "write_file":
path = tool_args.get("file_name", "unknown")
size = len(tool_args.get("contents", ""))
return f"Write file: `{path}` ({size} chars)"
elif tool_name == "python":
code = tool_args.get("code", "")[:200]
return f"Execute Python:\n```python\n{code}\n```"
else:
args_str = json.dumps(tool_args, indent=2)[:300]
return f"Execute `{tool_name}` with args:\n```json\n{args_str}\n```"
def get_impact_level(tool_name: str) -> str:
"""Return the impact level for a tool (high, medium, or low)."""
high_impact = {"shell", "python"}
medium_impact = {"write_file", "aider", "plan_and_execute"}
if tool_name in high_impact:
return "high"
if tool_name in medium_impact:
return "medium"
return "low"