diff --git a/config/allowlist.yaml b/config/allowlist.yaml new file mode 100644 index 00000000..efb06c9e --- /dev/null +++ b/config/allowlist.yaml @@ -0,0 +1,77 @@ +# ── Tool Allowlist — autonomous operation gate ───────────────────────────── +# +# When Timmy runs without a human present (non-interactive terminal, or +# --autonomous flag), tool calls matching these patterns execute without +# confirmation. Anything NOT listed here is auto-rejected. +# +# This file is the ONLY gate for autonomous tool execution. +# GOLDEN_TIMMY in approvals.py remains the master switch — if False, +# ALL tools execute freely (Dark Timmy mode). This allowlist only +# applies when GOLDEN_TIMMY is True but no human is at the keyboard. +# +# Edit with care. This is sovereignty in action. +# ──────────────────────────────────────────────────────────────────────────── + +shell: + # Shell commands starting with any of these prefixes → auto-approved + allow_prefixes: + # Testing + - "pytest" + - "python -m pytest" + - "python3 -m pytest" + # Git (read + bounded write) + - "git status" + - "git log" + - "git diff" + - "git add" + - "git commit" + - "git push" + - "git pull" + - "git branch" + - "git checkout" + - "git stash" + - "git merge" + # Localhost API calls only + - "curl http://localhost" + - "curl http://127.0.0.1" + - "curl -s http://localhost" + - "curl -s http://127.0.0.1" + # Read-only inspection + - "ls" + - "cat " + - "head " + - "tail " + - "find " + - "grep " + - "wc " + - "echo " + - "pwd" + - "which " + - "ollama list" + - "ollama ps" + + # Commands containing ANY of these → always blocked, even if prefix matches + deny_patterns: + - "rm -rf /" + - "sudo " + - "> /dev/" + - "| sh" + - "| bash" + - "| zsh" + - "mkfs" + - "dd if=" + - ":(){:|:&};:" + +write_file: + # Only allow writes to paths under these prefixes + allowed_path_prefixes: + - "~/Timmy-Time-dashboard/" + - "/tmp/" + +python: + # Python execution auto-approved (sandboxed by Agno's PythonTools) + auto_approve: true + +plan_and_execute: + # Multi-step plans auto-approved — individual tool calls are still gated + auto_approve: true diff --git a/src/timmy/cli.py b/src/timmy/cli.py index 92886242..2d86c3e8 100644 --- a/src/timmy/cli.py +++ b/src/timmy/cli.py @@ -1,11 +1,12 @@ import logging import subprocess +import sys import typer from timmy.agent import create_timmy from timmy.prompts import STATUS_PROMPT -from timmy.tool_safety import format_action_description, get_impact_level +from timmy.tool_safety import format_action_description, get_impact_level, is_allowlisted logger = logging.getLogger(__name__) @@ -30,15 +31,26 @@ _MODEL_SIZE_OPTION = typer.Option( ) -def _handle_tool_confirmation(agent, run_output, session_id: str): +def _is_interactive() -> bool: + """Return True if stdin is a real terminal (human present).""" + return hasattr(sys.stdin, "isatty") and sys.stdin.isatty() + + +def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False): """Prompt user to approve/reject dangerous tool calls. When Agno pauses a run because a tool requires confirmation, this function displays the action, asks for approval via stdin, and resumes or rejects the run accordingly. + When autonomous=True (or stdin is not a terminal), tool calls are + checked against config/allowlist.yaml instead of prompting. + Allowlisted calls are auto-approved; everything else is auto-rejected. + Returns the final RunOutput after all confirmations are resolved. """ + interactive = _is_interactive() and not autonomous + max_rounds = 10 # safety limit for _ in range(max_rounds): status = getattr(run_output, "status", None) @@ -58,22 +70,34 @@ def _handle_tool_confirmation(agent, run_output, session_id: str): tool_name = getattr(te, "tool_name", "unknown") tool_args = getattr(te, "tool_args", {}) or {} - description = format_action_description(tool_name, tool_args) - impact = get_impact_level(tool_name) + if interactive: + # Human present — prompt for approval + description = format_action_description(tool_name, tool_args) + impact = get_impact_level(tool_name) - typer.echo() - typer.echo(typer.style("Tool confirmation required", bold=True)) - typer.echo(f" Impact: {impact.upper()}") - typer.echo(f" {description}") - typer.echo() + typer.echo() + typer.echo(typer.style("Tool confirmation required", bold=True)) + typer.echo(f" Impact: {impact.upper()}") + typer.echo(f" {description}") + typer.echo() - approved = typer.confirm("Allow this action?", default=False) - if approved: - req.confirm() - logger.info("CLI: approved %s", tool_name) + approved = typer.confirm("Allow this action?", default=False) + if approved: + req.confirm() + logger.info("CLI: approved %s", tool_name) + else: + req.reject(note="User rejected from CLI") + logger.info("CLI: rejected %s", tool_name) else: - req.reject(note="User rejected from CLI") - logger.info("CLI: rejected %s", tool_name) + # Autonomous mode — check allowlist + if is_allowlisted(tool_name, tool_args): + req.confirm() + logger.info("AUTO-APPROVED (allowlist): %s", tool_name) + else: + req.reject(note="Auto-rejected: not in allowlist") + logger.info( + "AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100] + ) # Resume the run so the agent sees the confirmation result try: @@ -133,11 +157,21 @@ def chat( "--session-id", help="Use a specific session ID for this conversation", ), + autonomous: bool = typer.Option( + False, + "--autonomous", + "-a", + help="Autonomous mode: auto-approve allowlisted tools, reject the rest (no stdin prompts)", + ), ): """Send a message to Timmy. Conversation history persists across invocations. Use --new to start fresh, or --session-id to use a specific session. + + Use --autonomous for non-interactive contexts (scripts, dev loops). Tool + calls are checked against config/allowlist.yaml — allowlisted operations + execute automatically, everything else is safely rejected. """ import uuid @@ -153,7 +187,7 @@ def chat( run_output = timmy.run(message, stream=False, session_id=session_id) # Handle paused runs — dangerous tools need user approval - run_output = _handle_tool_confirmation(timmy, run_output, session_id) + run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous) # Print the final response content = run_output.content if hasattr(run_output, "content") else str(run_output) diff --git a/src/timmy/tool_safety.py b/src/timmy/tool_safety.py index 62cd8d1e..95fee7d4 100644 --- a/src/timmy/tool_safety.py +++ b/src/timmy/tool_safety.py @@ -5,13 +5,19 @@ Classifies tools into tiers based on their potential impact: Requires user confirmation before execution. - SAFE: Read-only or purely computational. Executes without confirmation. -Also provides shared helpers for extracting hallucinated tool calls from -model output and formatting them for human review. Used by both the -Discord vendor and the dashboard chat route. +Also provides: +- Allowlist checker: reads config/allowlist.yaml to auto-approve bounded + tool calls when no human is present (autonomous mode). +- Shared helpers for extracting hallucinated tool calls from model output + and formatting them for human review. """ import json +import logging import re +from pathlib import Path + +logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tool classification @@ -71,6 +77,133 @@ def requires_confirmation(tool_name: str) -> bool: return True +# --------------------------------------------------------------------------- +# Allowlist — autonomous tool approval +# --------------------------------------------------------------------------- + +_ALLOWLIST_PATHS = [ + Path(__file__).resolve().parent.parent.parent / "config" / "allowlist.yaml", + Path.home() / "Timmy-Time-dashboard" / "config" / "allowlist.yaml", +] + +_allowlist_cache: dict | None = None + + +def _load_allowlist() -> dict: + """Load and cache allowlist.yaml. Returns {} if not found.""" + global _allowlist_cache + if _allowlist_cache is not None: + return _allowlist_cache + + try: + import yaml + except ImportError: + logger.debug("PyYAML not installed — allowlist disabled") + _allowlist_cache = {} + return _allowlist_cache + + for path in _ALLOWLIST_PATHS: + if path.is_file(): + try: + with open(path) as f: + _allowlist_cache = yaml.safe_load(f) or {} + logger.info("Loaded tool allowlist from %s", path) + return _allowlist_cache + except Exception as exc: + logger.warning("Failed to load allowlist %s: %s", path, exc) + + _allowlist_cache = {} + return _allowlist_cache + + +def reload_allowlist() -> None: + """Force a reload of the allowlist config (e.g., after editing YAML).""" + global _allowlist_cache + _allowlist_cache = None + _load_allowlist() + + +def is_allowlisted(tool_name: str, tool_args: dict | None = None) -> bool: + """Check if a specific tool call is allowlisted for autonomous execution. + + Returns True only when the tool call matches an explicit allowlist rule. + Returns False for anything not covered — safe-by-default. + """ + allowlist = _load_allowlist() + if not allowlist: + return False + + rule = allowlist.get(tool_name) + if rule is None: + return False + + tool_args = tool_args or {} + + # Simple auto-approve flag + if rule.get("auto_approve") is True: + return True + + # Shell: prefix + deny pattern matching + if tool_name == "shell": + return _check_shell_allowlist(rule, tool_args) + + # write_file: path prefix check + if tool_name == "write_file": + return _check_write_file_allowlist(rule, tool_args) + + return False + + +def _check_shell_allowlist(rule: dict, tool_args: dict) -> bool: + """Check if a shell command matches the allowlist.""" + # Extract the command string — Agno ShellTools uses "args" (list or str) + cmd = tool_args.get("command") or tool_args.get("args", "") + if isinstance(cmd, list): + cmd = " ".join(cmd) + cmd = cmd.strip() + + if not cmd: + return False + + # Check deny patterns first — these always block + deny_patterns = rule.get("deny_patterns", []) + for pattern in deny_patterns: + if pattern in cmd: + logger.warning("Shell command blocked by deny pattern %r: %s", pattern, cmd[:100]) + return False + + # Check allow prefixes + allow_prefixes = rule.get("allow_prefixes", []) + for prefix in allow_prefixes: + if cmd.startswith(prefix): + logger.info("Shell command auto-approved by prefix %r: %s", prefix, cmd[:100]) + return True + + return False + + +def _check_write_file_allowlist(rule: dict, tool_args: dict) -> bool: + """Check if a write_file target is within allowed paths.""" + path_str = tool_args.get("file_name") or tool_args.get("path", "") + if not path_str: + return False + + # Resolve ~ to home + if path_str.startswith("~"): + path_str = str(Path(path_str).expanduser()) + + allowed_prefixes = rule.get("allowed_path_prefixes", []) + for prefix in allowed_prefixes: + # Resolve ~ in the prefix too + if prefix.startswith("~"): + prefix = str(Path(prefix).expanduser()) + if path_str.startswith(prefix): + logger.info("write_file auto-approved for path: %s", path_str) + return True + + return False + + # --------------------------------------------------------------------------- # Tool call extraction from model output # --------------------------------------------------------------------------- diff --git a/src/timmy/tools.py b/src/timmy/tools.py index 79f94556..a7513997 100644 --- a/src/timmy/tools.py +++ b/src/timmy/tools.py @@ -504,8 +504,11 @@ def create_full_toolkit(base_dir: str | Path | None = None): toolkit = Toolkit( name="full", - requires_confirmation_tools=list(DANGEROUS_TOOLS), ) + # Set requires_confirmation_tools AFTER construction (avoids agno WARNING + # about tools not yet registered) but BEFORE register() calls (so each + # Function gets requires_confirmation=True). Fixes #79. + toolkit.requires_confirmation_tools = list(DANGEROUS_TOOLS) # Web search (optional — degrades gracefully if ddgs not installed) if _DUCKDUCKGO_AVAILABLE: diff --git a/tests/timmy/test_cli.py b/tests/timmy/test_cli.py index 4122dc5d..83e70187 100644 --- a/tests/timmy/test_cli.py +++ b/tests/timmy/test_cli.py @@ -177,8 +177,11 @@ def test_handle_tool_confirmation_approve(): mock_agent = MagicMock() mock_agent.continue_run.return_value = completed_run - # Simulate user typing "y" at the prompt - with patch("timmy.cli.typer.confirm", return_value=True): + # Simulate user typing "y" at the prompt (mock interactive terminal) + with ( + patch("timmy.cli._is_interactive", return_value=True), + patch("timmy.cli.typer.confirm", return_value=True), + ): result = _handle_tool_confirmation(mock_agent, paused_run, "cli") mock_req.confirm.assert_called_once() @@ -198,7 +201,10 @@ def test_handle_tool_confirmation_reject(): mock_agent = MagicMock() mock_agent.continue_run.return_value = completed_run - with patch("timmy.cli.typer.confirm", return_value=False): + with ( + patch("timmy.cli._is_interactive", return_value=True), + patch("timmy.cli.typer.confirm", return_value=False), + ): _handle_tool_confirmation(mock_agent, paused_run, "cli") mock_req.reject.assert_called_once() @@ -225,8 +231,49 @@ def test_handle_tool_confirmation_continue_error(): mock_agent = MagicMock() mock_agent.continue_run.side_effect = Exception("connection lost") - with patch("timmy.cli.typer.confirm", return_value=True): + with ( + patch("timmy.cli._is_interactive", return_value=True), + patch("timmy.cli.typer.confirm", return_value=True), + ): result = _handle_tool_confirmation(mock_agent, paused_run, "cli") # Should return the original paused run, not crash assert result is paused_run + + +def test_handle_tool_confirmation_autonomous_allowlisted(): + """In autonomous mode, allowlisted tools should be auto-approved.""" + paused_run, mock_req = _make_paused_run( + tool_name="shell", tool_args={"command": "pytest tests/ -x"} + ) + + completed_run = MagicMock() + completed_run.status = "COMPLETED" + completed_run.active_requirements = [] + + mock_agent = MagicMock() + mock_agent.continue_run.return_value = completed_run + + with patch("timmy.cli.is_allowlisted", return_value=True): + _handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True) + + mock_req.confirm.assert_called_once() + mock_req.reject.assert_not_called() + + +def test_handle_tool_confirmation_autonomous_not_allowlisted(): + """In autonomous mode, non-allowlisted tools should be auto-rejected.""" + paused_run, mock_req = _make_paused_run(tool_name="shell", tool_args={"command": "rm -rf /"}) + + completed_run = MagicMock() + completed_run.status = "COMPLETED" + completed_run.active_requirements = [] + + mock_agent = MagicMock() + mock_agent.continue_run.return_value = completed_run + + with patch("timmy.cli.is_allowlisted", return_value=False): + _handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True) + + mock_req.reject.assert_called_once() + mock_req.confirm.assert_not_called() diff --git a/tests/timmy/test_timmy_tools.py b/tests/timmy/test_timmy_tools.py index 31f4d260..d5e9a113 100644 --- a/tests/timmy/test_timmy_tools.py +++ b/tests/timmy/test_timmy_tools.py @@ -194,3 +194,43 @@ class TestAiderTool: catalog = get_all_available_tools() assert "aider" in catalog assert "forge" in catalog["aider"]["available_in"] + + +class TestFullToolkitConfirmationWarning: + """Regression tests for issue #79 — confirmation tool WARNING spam.""" + + def test_create_full_toolkit_no_confirmation_warning(self, caplog): + """create_full_toolkit should not emit 'Requires confirmation tool(s)' warnings. + + Agno's Toolkit.__init__ validates requires_confirmation_tools against the + initial (empty) tool list. We set the attribute *after* construction to + avoid the spurious warning while keeping per-tool confirmation checks. + """ + import logging + + from timmy.tools import create_full_toolkit + + with caplog.at_level(logging.WARNING): + create_full_toolkit() + + warning_msgs = [ + r.message for r in caplog.records if "Requires confirmation tool" in r.message + ] + assert warning_msgs == [], f"Unexpected confirmation warnings: {warning_msgs}" + + def test_dangerous_tools_still_require_confirmation(self): + """After the fix, dangerous tools registered in the toolkit must still be + marked as requiring confirmation.""" + from timmy.tools import create_full_toolkit + + toolkit = create_full_toolkit() + if toolkit is None: + pytest.skip("Agno tools not available") + + # shell and python are always registered in create_full_toolkit + for name in ("shell", "python", "write_file"): + func = toolkit.functions.get(name) + if func is not None: + assert func.requires_confirmation, ( + f"Tool '{name}' should require confirmation but doesn't" + ) diff --git a/tests/timmy/test_tool_safety.py b/tests/timmy/test_tool_safety.py index 3c2e12d0..49b8fc9a 100644 --- a/tests/timmy/test_tool_safety.py +++ b/tests/timmy/test_tool_safety.py @@ -1,9 +1,18 @@ -"""Tests for timmy.tool_safety — classification, extraction, and formatting.""" +"""Tests for timmy.tool_safety — classification, extraction, formatting, and allowlist.""" + +from pathlib import Path +from unittest.mock import patch + +import pytest from timmy.tool_safety import ( + _check_shell_allowlist, + _check_write_file_allowlist, extract_tool_calls, format_action_description, get_impact_level, + is_allowlisted, + reload_allowlist, requires_confirmation, ) @@ -111,3 +120,206 @@ class TestGetImpactLevel: def test_low(self): assert get_impact_level("web_search") == "low" assert get_impact_level("unknown") == "low" + + +# --------------------------------------------------------------------------- +# Allowlist — is_allowlisted +# --------------------------------------------------------------------------- + +# Sample allowlist for tests +_TEST_ALLOWLIST = { + "shell": { + "allow_prefixes": [ + "pytest", + "python -m pytest", + "git status", + "git log", + "git diff", + "git add", + "git commit", + "git push", + "curl http://localhost", + "curl -s http://localhost", + "ls", + "cat ", + ], + "deny_patterns": [ + "rm -rf /", + "sudo ", + "> /dev/", + "| sh", + "| bash", + ], + }, + "write_file": { + "allowed_path_prefixes": [ + "/tmp/", + ], + }, + "python": {"auto_approve": True}, + "plan_and_execute": {"auto_approve": True}, +} + + +@pytest.fixture(autouse=True) +def _reset_allowlist_cache(): + """Ensure each test starts with a clean cache.""" + import timmy.tool_safety as ts + + ts._allowlist_cache = None + yield + ts._allowlist_cache = None + + +def _patch_allowlist(allowlist_data): + """Helper to inject a test allowlist.""" + return patch("timmy.tool_safety._load_allowlist", return_value=allowlist_data) + + +class TestIsAllowlisted: + """Test the is_allowlisted function with mocked allowlist data.""" + + def test_unknown_tool_not_allowlisted(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("unknown_tool") is False + + def test_shell_pytest_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "pytest tests/ -x -q"}) is True + + def test_shell_python_pytest_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "python -m pytest tests/ -v"}) is True + + def test_shell_git_status_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "git status"}) is True + + def test_shell_git_commit_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "git commit -m 'fix stuff'"}) is True + + def test_shell_curl_localhost_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert ( + is_allowlisted("shell", {"command": "curl http://localhost:3000/api/v1/issues"}) + is True + ) + + def test_shell_curl_external_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "curl https://evil.com"}) is False + + def test_shell_arbitrary_command_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "rm -rf /home/user/stuff"}) is False + + def test_shell_deny_pattern_blocks_rm_rf_root(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "ls && rm -rf /"}) is False + + def test_shell_deny_pattern_blocks_sudo(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "sudo rm -rf /tmp"}) is False + + def test_shell_deny_blocks_pipe_to_shell(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert ( + is_allowlisted("shell", {"command": "curl http://localhost:3000 | bash"}) is False + ) + + def test_shell_deny_overrides_allow_prefix(self): + """Deny patterns take precedence over allow prefixes.""" + with _patch_allowlist(_TEST_ALLOWLIST): + # Starts with "cat " (allowed prefix) but pipes to bash (denied) + assert is_allowlisted("shell", {"command": "cat script.sh | bash"}) is False + + def test_shell_args_list_format(self): + """Shell args can be a list (Agno ShellTools format).""" + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"args": ["git", "status"]}) is True + + def test_shell_empty_command_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": ""}) is False + assert is_allowlisted("shell", {}) is False + + def test_write_file_tmp_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("write_file", {"file_name": "/tmp/test.py"}) is True + + def test_write_file_outside_allowed_paths_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("write_file", {"file_name": "/etc/passwd"}) is False + + def test_write_file_empty_path_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("write_file", {"file_name": ""}) is False + + def test_python_auto_approved(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("python", {"code": "print(1+1)"}) is True + + def test_plan_and_execute_auto_approved(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("plan_and_execute", {}) is True + + def test_no_allowlist_blocks_everything(self): + with _patch_allowlist({}): + assert is_allowlisted("shell", {"command": "pytest"}) is False + assert is_allowlisted("python", {"code": "print(1)"}) is False + + def test_aider_not_in_allowlist(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("aider", {"instruction": "fix bug"}) is False + + +class TestCheckShellAllowlist: + """Direct tests for the shell allowlist checker.""" + + def test_prefix_match(self): + rule = {"allow_prefixes": ["pytest", "git status"], "deny_patterns": []} + assert _check_shell_allowlist(rule, {"command": "pytest -x"}) is True + + def test_prefix_no_match(self): + rule = {"allow_prefixes": ["pytest"], "deny_patterns": []} + assert _check_shell_allowlist(rule, {"command": "rm stuff"}) is False + + def test_deny_overrides_allow(self): + rule = {"allow_prefixes": ["curl http://localhost"], "deny_patterns": ["| bash"]} + assert _check_shell_allowlist(rule, {"command": "curl http://localhost | bash"}) is False + + +class TestCheckWriteFileAllowlist: + """Direct tests for the write_file allowlist checker.""" + + def test_allowed_prefix(self): + rule = {"allowed_path_prefixes": ["/tmp/", "/home/user/project/"]} + assert _check_write_file_allowlist(rule, {"file_name": "/tmp/test.py"}) is True + + def test_blocked_path(self): + rule = {"allowed_path_prefixes": ["/tmp/"]} + assert _check_write_file_allowlist(rule, {"file_name": "/etc/secrets"}) is False + + def test_tilde_expansion(self): + """Paths starting with ~ should be expanded.""" + home = str(Path.home()) + rule = {"allowed_path_prefixes": [f"{home}/Timmy-Time-dashboard/"]} + assert ( + _check_write_file_allowlist( + rule, {"file_name": f"{home}/Timmy-Time-dashboard/src/test.py"} + ) + is True + ) + + +class TestReloadAllowlist: + """Test that reload_allowlist clears the cache.""" + + def test_reload_clears_cache(self): + import timmy.tool_safety as ts + + ts._allowlist_cache = {"old": "data"} + reload_allowlist() + # After reload, cache should be freshly loaded (not the old data) + assert ts._allowlist_cache != {"old": "data"}