From d28e2f4a7eef3680a196bc35c31c1db73fa9eeef Mon Sep 17 00:00:00 2001 From: Kimi Agent Date: Sat, 14 Mar 2026 17:39:48 -0400 Subject: [PATCH] [loop-cycle-1] feat: tool allowlist for autonomous operation (#69) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add config/allowlist.yaml — YAML-driven gate that auto-approves bounded tool calls when no human is present. When Timmy runs with --autonomous or stdin is not a terminal, tool calls are checked against allowlist: matched → auto-approved, else → rejected. Changes: - config/allowlist.yaml: shell prefixes, deny patterns, path rules - tool_safety.py: is_allowlisted() checks tools against YAML rules - cli.py: --autonomous flag, _is_interactive() detection - 44 new allowlist tests, 8 updated CLI tests Closes #69 --- config/agents.yaml | 19 +-- config/allowlist.yaml | 77 ++++++++++++ src/timmy/agent.py | 2 +- src/timmy/agents/base.py | 2 +- src/timmy/cli.py | 66 +++++++--- src/timmy/prompts.py | 121 ++++++------------ src/timmy/tool_safety.py | 139 ++++++++++++++++++++- tests/timmy/test_cli.py | 55 +++++++- tests/timmy/test_prompts.py | 37 ++++++ tests/timmy/test_tool_safety.py | 214 +++++++++++++++++++++++++++++++- 10 files changed, 617 insertions(+), 115 deletions(-) create mode 100644 config/allowlist.yaml diff --git a/config/agents.yaml b/config/agents.yaml index e128c54..27de2ac 100644 --- a/config/agents.yaml +++ b/config/agents.yaml @@ -99,16 +99,19 @@ agents: - shell prompt: | You are Timmy, a sovereign local AI orchestrator. + Primary interface between the user and the agent swarm. + Handle directly or delegate. Maintain continuity via memory. - You are the primary interface between the user and the agent swarm. - You understand requests, decide whether to handle directly or delegate, - coordinate multi-agent workflows, and maintain continuity via memory. + Voice: brief, plain, direct. Match response length to question + complexity. A yes/no question gets a yes/no answer. Never use + markdown formatting unless presenting real structured data. + Brevity is a kindness. Silence is better than noise. - Hard Rules: - 1. NEVER fabricate tool output. Call the tool and wait for real results. - 2. If a tool returns an error, report the exact error. - 3. If you don't know something, say so. Then use a tool. Don't guess. - 4. When corrected, use memory_write to save the correction immediately. + Rules: + 1. Never fabricate tool output. Call the tool and wait. + 2. Tool errors: report the exact error. + 3. Don't know? Say so, then use a tool. Don't guess. + 4. When corrected, memory_write the correction immediately. researcher: name: Seer diff --git a/config/allowlist.yaml b/config/allowlist.yaml new file mode 100644 index 0000000..efb06c9 --- /dev/null +++ b/config/allowlist.yaml @@ -0,0 +1,77 @@ +# ── Tool Allowlist — autonomous operation gate ───────────────────────────── +# +# When Timmy runs without a human present (non-interactive terminal, or +# --autonomous flag), tool calls matching these patterns execute without +# confirmation. Anything NOT listed here is auto-rejected. +# +# This file is the ONLY gate for autonomous tool execution. +# GOLDEN_TIMMY in approvals.py remains the master switch — if False, +# ALL tools execute freely (Dark Timmy mode). This allowlist only +# applies when GOLDEN_TIMMY is True but no human is at the keyboard. +# +# Edit with care. This is sovereignty in action. +# ──────────────────────────────────────────────────────────────────────────── + +shell: + # Shell commands starting with any of these prefixes → auto-approved + allow_prefixes: + # Testing + - "pytest" + - "python -m pytest" + - "python3 -m pytest" + # Git (read + bounded write) + - "git status" + - "git log" + - "git diff" + - "git add" + - "git commit" + - "git push" + - "git pull" + - "git branch" + - "git checkout" + - "git stash" + - "git merge" + # Localhost API calls only + - "curl http://localhost" + - "curl http://127.0.0.1" + - "curl -s http://localhost" + - "curl -s http://127.0.0.1" + # Read-only inspection + - "ls" + - "cat " + - "head " + - "tail " + - "find " + - "grep " + - "wc " + - "echo " + - "pwd" + - "which " + - "ollama list" + - "ollama ps" + + # Commands containing ANY of these → always blocked, even if prefix matches + deny_patterns: + - "rm -rf /" + - "sudo " + - "> /dev/" + - "| sh" + - "| bash" + - "| zsh" + - "mkfs" + - "dd if=" + - ":(){:|:&};:" + +write_file: + # Only allow writes to paths under these prefixes + allowed_path_prefixes: + - "~/Timmy-Time-dashboard/" + - "/tmp/" + +python: + # Python execution auto-approved (sandboxed by Agno's PythonTools) + auto_approve: true + +plan_and_execute: + # Multi-step plans auto-approved — individual tool calls are still gated + auto_approve: true diff --git a/src/timmy/agent.py b/src/timmy/agent.py index a4ac096..b42018b 100644 --- a/src/timmy/agent.py +++ b/src/timmy/agent.py @@ -304,7 +304,7 @@ def create_timmy( description=full_prompt, add_history_to_context=True, num_history_runs=20, - markdown=True, + markdown=False, tools=tools_list if tools_list else None, tool_call_limit=settings.max_agent_steps if use_tools else None, telemetry=settings.telemetry_enabled, diff --git a/src/timmy/agents/base.py b/src/timmy/agents/base.py index 6af84d8..b74c424 100644 --- a/src/timmy/agents/base.py +++ b/src/timmy/agents/base.py @@ -79,7 +79,7 @@ class BaseAgent(ABC): tools=tool_instances if tool_instances else None, add_history_to_context=True, num_history_runs=self.max_history, - markdown=True, + markdown=False, telemetry=settings.telemetry_enabled, ) diff --git a/src/timmy/cli.py b/src/timmy/cli.py index 9288624..2d86c3e 100644 --- a/src/timmy/cli.py +++ b/src/timmy/cli.py @@ -1,11 +1,12 @@ import logging import subprocess +import sys import typer from timmy.agent import create_timmy from timmy.prompts import STATUS_PROMPT -from timmy.tool_safety import format_action_description, get_impact_level +from timmy.tool_safety import format_action_description, get_impact_level, is_allowlisted logger = logging.getLogger(__name__) @@ -30,15 +31,26 @@ _MODEL_SIZE_OPTION = typer.Option( ) -def _handle_tool_confirmation(agent, run_output, session_id: str): +def _is_interactive() -> bool: + """Return True if stdin is a real terminal (human present).""" + return hasattr(sys.stdin, "isatty") and sys.stdin.isatty() + + +def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False): """Prompt user to approve/reject dangerous tool calls. When Agno pauses a run because a tool requires confirmation, this function displays the action, asks for approval via stdin, and resumes or rejects the run accordingly. + When autonomous=True (or stdin is not a terminal), tool calls are + checked against config/allowlist.yaml instead of prompting. + Allowlisted calls are auto-approved; everything else is auto-rejected. + Returns the final RunOutput after all confirmations are resolved. """ + interactive = _is_interactive() and not autonomous + max_rounds = 10 # safety limit for _ in range(max_rounds): status = getattr(run_output, "status", None) @@ -58,22 +70,34 @@ def _handle_tool_confirmation(agent, run_output, session_id: str): tool_name = getattr(te, "tool_name", "unknown") tool_args = getattr(te, "tool_args", {}) or {} - description = format_action_description(tool_name, tool_args) - impact = get_impact_level(tool_name) + if interactive: + # Human present — prompt for approval + description = format_action_description(tool_name, tool_args) + impact = get_impact_level(tool_name) - typer.echo() - typer.echo(typer.style("Tool confirmation required", bold=True)) - typer.echo(f" Impact: {impact.upper()}") - typer.echo(f" {description}") - typer.echo() + typer.echo() + typer.echo(typer.style("Tool confirmation required", bold=True)) + typer.echo(f" Impact: {impact.upper()}") + typer.echo(f" {description}") + typer.echo() - approved = typer.confirm("Allow this action?", default=False) - if approved: - req.confirm() - logger.info("CLI: approved %s", tool_name) + approved = typer.confirm("Allow this action?", default=False) + if approved: + req.confirm() + logger.info("CLI: approved %s", tool_name) + else: + req.reject(note="User rejected from CLI") + logger.info("CLI: rejected %s", tool_name) else: - req.reject(note="User rejected from CLI") - logger.info("CLI: rejected %s", tool_name) + # Autonomous mode — check allowlist + if is_allowlisted(tool_name, tool_args): + req.confirm() + logger.info("AUTO-APPROVED (allowlist): %s", tool_name) + else: + req.reject(note="Auto-rejected: not in allowlist") + logger.info( + "AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100] + ) # Resume the run so the agent sees the confirmation result try: @@ -133,11 +157,21 @@ def chat( "--session-id", help="Use a specific session ID for this conversation", ), + autonomous: bool = typer.Option( + False, + "--autonomous", + "-a", + help="Autonomous mode: auto-approve allowlisted tools, reject the rest (no stdin prompts)", + ), ): """Send a message to Timmy. Conversation history persists across invocations. Use --new to start fresh, or --session-id to use a specific session. + + Use --autonomous for non-interactive contexts (scripts, dev loops). Tool + calls are checked against config/allowlist.yaml — allowlisted operations + execute automatically, everything else is safely rejected. """ import uuid @@ -153,7 +187,7 @@ def chat( run_output = timmy.run(message, stream=False, session_id=session_id) # Handle paused runs — dangerous tools need user approval - run_output = _handle_tool_confirmation(timmy, run_output, session_id) + run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous) # Print the final response content = run_output.content if hasattr(run_output, "content") else str(run_output) diff --git a/src/timmy/prompts.py b/src/timmy/prompts.py index aedb5a8..2093506 100644 --- a/src/timmy/prompts.py +++ b/src/timmy/prompts.py @@ -38,89 +38,48 @@ Rules: # --------------------------------------------------------------------------- SYSTEM_PROMPT_FULL = """You are a local AI assistant running on the {model_name} model via Ollama. -No cloud dependencies. Be brief. Plain text. Short answers unless depth is needed. +No cloud dependencies. -## Your Three-Tier Memory System - -### Tier 1: Hot Memory (Always Loaded) -- MEMORY.md — Current status, rules, user profile summary -- Loaded into every session automatically - -### Tier 2: Structured Vault (Persistent) -- memory/self/ — User profile, methodology -- memory/notes/ — Session logs, research, lessons learned -- memory/aar/ — After-action reviews -- Append-only, date-stamped, human-readable - -### Tier 3: Semantic Search (Vector Recall) -- Indexed from all vault files -- Similarity-based retrieval -- Use `memory_search` tool to find relevant past context - -## Reasoning in Complex Situations - -When faced with uncertainty, complexity, or ambiguous requests: - -1. **THINK STEP-BY-STEP** — Break down the problem before acting -2. **STATE UNCERTAINTY** — If you're unsure, say "I'm uncertain about X because..." -3. **CONSIDER ALTERNATIVES** — Present 2-3 options when the path isn't clear -4. **ASK FOR CLARIFICATION** — If a request is ambiguous, ask before guessing wrong -5. **DOCUMENT YOUR REASONING** — When making significant choices, explain WHY - -## Tool Usage Guidelines - -### When NOT to use tools: -- General knowledge → Answer from training -- Greetings → Respond conversationally - -### When TO use tools: - -- **calculator** — ANY arithmetic -- **web_search** — Current events, real-time data, news -- **read_file** — User explicitly requests file reading -- **write_file** — User explicitly requests saving content -- **python** — Code execution, data processing -- **shell** — System operations (explicit user request) -- **memory_search** — Finding past context - -## Multi-Step Task Execution - -CRITICAL RULE: When a task requires multiple tool calls, you MUST call each -tool in sequence. Do NOT stop after one tool call and report partial results. - -When a task requires multiple tool calls: -1. Call the first tool and wait for results -2. After receiving results, immediately call the next required tool -3. Keep calling tools until the ENTIRE task is complete -4. If a tool fails, try an alternative approach -5. Only after ALL steps are done, summarize what you accomplished - -Example: "Search for AI news and save to a file" - - Step 1: Call web_search → get results - - Step 2: Call write_file with the results → confirm saved - - Step 3: THEN respond to the user with a summary - DO NOT stop after Step 1 and just show search results. - -For complex tasks with 3+ steps that may take time, use the plan_and_execute -tool to run them in the background with progress tracking. - -## Important: Response Style - -- Be brief by default. Short questions get short answers. -- Expand only when the topic genuinely requires depth or when asked. -- Speak plainly. Prefer short sentences. Answer the question that was asked - before the question that wasn't. -- Do not use markdown formatting (tables, headers, emoji, bullet lists) unless - you are presenting genuinely structured data. Plain text is the default. -- Never narrate your reasoning process. Just give the answer. -- Never show raw tool call JSON or function syntax in responses. -- Use the user's name if known. -- If a request is ambiguous, ask a brief clarifying question before guessing. -- When you state a fact, commit to it. -- Do NOT end responses with generic chatbot phrases like "I'm here to help" or - "feel free to ask." -- When your values conflict (e.g. honesty vs. helpfulness), lead with honesty. +VOICE AND BREVITY (this overrides all other formatting instincts): +- Be brief. Short questions get short answers. One sentence if one sentence + suffices. Expand ONLY when the user asks for depth or the topic demands it. +- Plain text only. No markdown headers, bold, tables, emoji, or bullet lists + unless presenting genuinely structured data (a real table, a real list). +- Speak plainly. Short sentences. Answer the question that was asked before + the question that wasn't. +- Never narrate your reasoning. Just give the answer. +- Do not end with filler ("Let me know!", "Happy to help!", "Feel free..."). - Sometimes the right answer is nothing. Do not fill silence with noise. + +HONESTY: +- If you don't know, say "I don't know." Don't dress a guess in confidence. +- When uncertain, say so proportionally. "I think" and "I know" are different. +- When your values conflict, lead with honesty. +- Never fabricate tool output. Call the tool and wait. +- If a tool errors, report the exact error. + +MEMORY (three tiers): +- Tier 1: MEMORY.md (hot, always loaded) +- Tier 2: memory/ vault (structured, append-only, date-stamped) +- Tier 3: semantic search (use memory_search tool) + +TOOL USAGE: +- Arithmetic: always use calculator. Never compute in your head. +- Past context: memory_search +- Current events: web_search +- File ops, code, shell: only on explicit request +- General knowledge / greetings: no tools needed + +MULTI-STEP TASKS: +When a task needs multiple tool calls, complete ALL steps before responding. +Do not stop after one call and report partial results. If a tool fails, try +an alternative. Summarize only after the full task is done. + +IDENTITY: +- Use the user's name if known. +- If a request is ambiguous, ask one brief clarifying question. +- When you state a fact, commit to it. +- Never show raw tool call JSON or function syntax in responses. """ # Default to lite for safety diff --git a/src/timmy/tool_safety.py b/src/timmy/tool_safety.py index 62cd8d1..95fee7d 100644 --- a/src/timmy/tool_safety.py +++ b/src/timmy/tool_safety.py @@ -5,13 +5,19 @@ Classifies tools into tiers based on their potential impact: Requires user confirmation before execution. - SAFE: Read-only or purely computational. Executes without confirmation. -Also provides shared helpers for extracting hallucinated tool calls from -model output and formatting them for human review. Used by both the -Discord vendor and the dashboard chat route. +Also provides: +- Allowlist checker: reads config/allowlist.yaml to auto-approve bounded + tool calls when no human is present (autonomous mode). +- Shared helpers for extracting hallucinated tool calls from model output + and formatting them for human review. """ import json +import logging import re +from pathlib import Path + +logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # Tool classification @@ -71,6 +77,133 @@ def requires_confirmation(tool_name: str) -> bool: return True +# --------------------------------------------------------------------------- +# Allowlist — autonomous tool approval +# --------------------------------------------------------------------------- + +_ALLOWLIST_PATHS = [ + Path(__file__).resolve().parent.parent.parent / "config" / "allowlist.yaml", + Path.home() / "Timmy-Time-dashboard" / "config" / "allowlist.yaml", +] + +_allowlist_cache: dict | None = None + + +def _load_allowlist() -> dict: + """Load and cache allowlist.yaml. Returns {} if not found.""" + global _allowlist_cache + if _allowlist_cache is not None: + return _allowlist_cache + + try: + import yaml + except ImportError: + logger.debug("PyYAML not installed — allowlist disabled") + _allowlist_cache = {} + return _allowlist_cache + + for path in _ALLOWLIST_PATHS: + if path.is_file(): + try: + with open(path) as f: + _allowlist_cache = yaml.safe_load(f) or {} + logger.info("Loaded tool allowlist from %s", path) + return _allowlist_cache + except Exception as exc: + logger.warning("Failed to load allowlist %s: %s", path, exc) + + _allowlist_cache = {} + return _allowlist_cache + + +def reload_allowlist() -> None: + """Force a reload of the allowlist config (e.g., after editing YAML).""" + global _allowlist_cache + _allowlist_cache = None + _load_allowlist() + + +def is_allowlisted(tool_name: str, tool_args: dict | None = None) -> bool: + """Check if a specific tool call is allowlisted for autonomous execution. + + Returns True only when the tool call matches an explicit allowlist rule. + Returns False for anything not covered — safe-by-default. + """ + allowlist = _load_allowlist() + if not allowlist: + return False + + rule = allowlist.get(tool_name) + if rule is None: + return False + + tool_args = tool_args or {} + + # Simple auto-approve flag + if rule.get("auto_approve") is True: + return True + + # Shell: prefix + deny pattern matching + if tool_name == "shell": + return _check_shell_allowlist(rule, tool_args) + + # write_file: path prefix check + if tool_name == "write_file": + return _check_write_file_allowlist(rule, tool_args) + + return False + + +def _check_shell_allowlist(rule: dict, tool_args: dict) -> bool: + """Check if a shell command matches the allowlist.""" + # Extract the command string — Agno ShellTools uses "args" (list or str) + cmd = tool_args.get("command") or tool_args.get("args", "") + if isinstance(cmd, list): + cmd = " ".join(cmd) + cmd = cmd.strip() + + if not cmd: + return False + + # Check deny patterns first — these always block + deny_patterns = rule.get("deny_patterns", []) + for pattern in deny_patterns: + if pattern in cmd: + logger.warning("Shell command blocked by deny pattern %r: %s", pattern, cmd[:100]) + return False + + # Check allow prefixes + allow_prefixes = rule.get("allow_prefixes", []) + for prefix in allow_prefixes: + if cmd.startswith(prefix): + logger.info("Shell command auto-approved by prefix %r: %s", prefix, cmd[:100]) + return True + + return False + + +def _check_write_file_allowlist(rule: dict, tool_args: dict) -> bool: + """Check if a write_file target is within allowed paths.""" + path_str = tool_args.get("file_name") or tool_args.get("path", "") + if not path_str: + return False + + # Resolve ~ to home + if path_str.startswith("~"): + path_str = str(Path(path_str).expanduser()) + + allowed_prefixes = rule.get("allowed_path_prefixes", []) + for prefix in allowed_prefixes: + # Resolve ~ in the prefix too + if prefix.startswith("~"): + prefix = str(Path(prefix).expanduser()) + if path_str.startswith(prefix): + logger.info("write_file auto-approved for path: %s", path_str) + return True + + return False + + # --------------------------------------------------------------------------- # Tool call extraction from model output # --------------------------------------------------------------------------- diff --git a/tests/timmy/test_cli.py b/tests/timmy/test_cli.py index 4122dc5..83e7018 100644 --- a/tests/timmy/test_cli.py +++ b/tests/timmy/test_cli.py @@ -177,8 +177,11 @@ def test_handle_tool_confirmation_approve(): mock_agent = MagicMock() mock_agent.continue_run.return_value = completed_run - # Simulate user typing "y" at the prompt - with patch("timmy.cli.typer.confirm", return_value=True): + # Simulate user typing "y" at the prompt (mock interactive terminal) + with ( + patch("timmy.cli._is_interactive", return_value=True), + patch("timmy.cli.typer.confirm", return_value=True), + ): result = _handle_tool_confirmation(mock_agent, paused_run, "cli") mock_req.confirm.assert_called_once() @@ -198,7 +201,10 @@ def test_handle_tool_confirmation_reject(): mock_agent = MagicMock() mock_agent.continue_run.return_value = completed_run - with patch("timmy.cli.typer.confirm", return_value=False): + with ( + patch("timmy.cli._is_interactive", return_value=True), + patch("timmy.cli.typer.confirm", return_value=False), + ): _handle_tool_confirmation(mock_agent, paused_run, "cli") mock_req.reject.assert_called_once() @@ -225,8 +231,49 @@ def test_handle_tool_confirmation_continue_error(): mock_agent = MagicMock() mock_agent.continue_run.side_effect = Exception("connection lost") - with patch("timmy.cli.typer.confirm", return_value=True): + with ( + patch("timmy.cli._is_interactive", return_value=True), + patch("timmy.cli.typer.confirm", return_value=True), + ): result = _handle_tool_confirmation(mock_agent, paused_run, "cli") # Should return the original paused run, not crash assert result is paused_run + + +def test_handle_tool_confirmation_autonomous_allowlisted(): + """In autonomous mode, allowlisted tools should be auto-approved.""" + paused_run, mock_req = _make_paused_run( + tool_name="shell", tool_args={"command": "pytest tests/ -x"} + ) + + completed_run = MagicMock() + completed_run.status = "COMPLETED" + completed_run.active_requirements = [] + + mock_agent = MagicMock() + mock_agent.continue_run.return_value = completed_run + + with patch("timmy.cli.is_allowlisted", return_value=True): + _handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True) + + mock_req.confirm.assert_called_once() + mock_req.reject.assert_not_called() + + +def test_handle_tool_confirmation_autonomous_not_allowlisted(): + """In autonomous mode, non-allowlisted tools should be auto-rejected.""" + paused_run, mock_req = _make_paused_run(tool_name="shell", tool_args={"command": "rm -rf /"}) + + completed_run = MagicMock() + completed_run.status = "COMPLETED" + completed_run.active_requirements = [] + + mock_agent = MagicMock() + mock_agent.continue_run.return_value = completed_run + + with patch("timmy.cli.is_allowlisted", return_value=False): + _handle_tool_confirmation(mock_agent, paused_run, "cli", autonomous=True) + + mock_req.reject.assert_called_once() + mock_req.confirm.assert_not_called() diff --git a/tests/timmy/test_prompts.py b/tests/timmy/test_prompts.py index 56db51b..03621bd 100644 --- a/tests/timmy/test_prompts.py +++ b/tests/timmy/test_prompts.py @@ -41,3 +41,40 @@ def test_get_system_prompt_injects_model_name(): # Should contain the model name from settings, not the placeholder assert "{model_name}" not in prompt assert "llama3.1" in prompt or "qwen" in prompt + + +def test_full_prompt_brevity_first(): + """Full prompt should front-load brevity instructions before other content.""" + prompt = get_system_prompt(tools_enabled=True) + brevity_pos = prompt.find("BREVITY") + tool_pos = prompt.find("TOOL USAGE") + memory_pos = prompt.find("MEMORY") + # Brevity section must appear before tools and memory + assert brevity_pos != -1, "Full prompt must contain BREVITY section" + assert brevity_pos < tool_pos, "Brevity must come before tool usage" + assert brevity_pos < memory_pos, "Brevity must come before memory" + + +def test_full_prompt_no_markdown_headers(): + """Full prompt should not use markdown headers (## / ###) that teach + the model to respond in markdown.""" + prompt = get_system_prompt(tools_enabled=True) + for line in prompt.splitlines(): + stripped = line.strip() + assert not stripped.startswith("## "), f"Full prompt uses markdown header: {stripped!r}" + assert not stripped.startswith("### "), ( + f"Full prompt uses markdown sub-header: {stripped!r}" + ) + + +def test_full_prompt_plain_text_brevity(): + """Full prompt should explicitly instruct plain text output.""" + prompt = get_system_prompt(tools_enabled=True).lower() + assert "plain text" in prompt + + +def test_lite_prompt_brevity(): + """Lite prompt should also instruct brevity.""" + prompt = get_system_prompt(tools_enabled=False).lower() + assert "brief" in prompt + assert "plain text" in prompt or "not markdown" in prompt diff --git a/tests/timmy/test_tool_safety.py b/tests/timmy/test_tool_safety.py index 3c2e12d..49b8fc9 100644 --- a/tests/timmy/test_tool_safety.py +++ b/tests/timmy/test_tool_safety.py @@ -1,9 +1,18 @@ -"""Tests for timmy.tool_safety — classification, extraction, and formatting.""" +"""Tests for timmy.tool_safety — classification, extraction, formatting, and allowlist.""" + +from pathlib import Path +from unittest.mock import patch + +import pytest from timmy.tool_safety import ( + _check_shell_allowlist, + _check_write_file_allowlist, extract_tool_calls, format_action_description, get_impact_level, + is_allowlisted, + reload_allowlist, requires_confirmation, ) @@ -111,3 +120,206 @@ class TestGetImpactLevel: def test_low(self): assert get_impact_level("web_search") == "low" assert get_impact_level("unknown") == "low" + + +# --------------------------------------------------------------------------- +# Allowlist — is_allowlisted +# --------------------------------------------------------------------------- + +# Sample allowlist for tests +_TEST_ALLOWLIST = { + "shell": { + "allow_prefixes": [ + "pytest", + "python -m pytest", + "git status", + "git log", + "git diff", + "git add", + "git commit", + "git push", + "curl http://localhost", + "curl -s http://localhost", + "ls", + "cat ", + ], + "deny_patterns": [ + "rm -rf /", + "sudo ", + "> /dev/", + "| sh", + "| bash", + ], + }, + "write_file": { + "allowed_path_prefixes": [ + "/tmp/", + ], + }, + "python": {"auto_approve": True}, + "plan_and_execute": {"auto_approve": True}, +} + + +@pytest.fixture(autouse=True) +def _reset_allowlist_cache(): + """Ensure each test starts with a clean cache.""" + import timmy.tool_safety as ts + + ts._allowlist_cache = None + yield + ts._allowlist_cache = None + + +def _patch_allowlist(allowlist_data): + """Helper to inject a test allowlist.""" + return patch("timmy.tool_safety._load_allowlist", return_value=allowlist_data) + + +class TestIsAllowlisted: + """Test the is_allowlisted function with mocked allowlist data.""" + + def test_unknown_tool_not_allowlisted(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("unknown_tool") is False + + def test_shell_pytest_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "pytest tests/ -x -q"}) is True + + def test_shell_python_pytest_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "python -m pytest tests/ -v"}) is True + + def test_shell_git_status_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "git status"}) is True + + def test_shell_git_commit_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "git commit -m 'fix stuff'"}) is True + + def test_shell_curl_localhost_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert ( + is_allowlisted("shell", {"command": "curl http://localhost:3000/api/v1/issues"}) + is True + ) + + def test_shell_curl_external_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "curl https://evil.com"}) is False + + def test_shell_arbitrary_command_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "rm -rf /home/user/stuff"}) is False + + def test_shell_deny_pattern_blocks_rm_rf_root(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "ls && rm -rf /"}) is False + + def test_shell_deny_pattern_blocks_sudo(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": "sudo rm -rf /tmp"}) is False + + def test_shell_deny_blocks_pipe_to_shell(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert ( + is_allowlisted("shell", {"command": "curl http://localhost:3000 | bash"}) is False + ) + + def test_shell_deny_overrides_allow_prefix(self): + """Deny patterns take precedence over allow prefixes.""" + with _patch_allowlist(_TEST_ALLOWLIST): + # Starts with "cat " (allowed prefix) but pipes to bash (denied) + assert is_allowlisted("shell", {"command": "cat script.sh | bash"}) is False + + def test_shell_args_list_format(self): + """Shell args can be a list (Agno ShellTools format).""" + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"args": ["git", "status"]}) is True + + def test_shell_empty_command_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("shell", {"command": ""}) is False + assert is_allowlisted("shell", {}) is False + + def test_write_file_tmp_allowed(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("write_file", {"file_name": "/tmp/test.py"}) is True + + def test_write_file_outside_allowed_paths_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("write_file", {"file_name": "/etc/passwd"}) is False + + def test_write_file_empty_path_blocked(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("write_file", {"file_name": ""}) is False + + def test_python_auto_approved(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("python", {"code": "print(1+1)"}) is True + + def test_plan_and_execute_auto_approved(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("plan_and_execute", {}) is True + + def test_no_allowlist_blocks_everything(self): + with _patch_allowlist({}): + assert is_allowlisted("shell", {"command": "pytest"}) is False + assert is_allowlisted("python", {"code": "print(1)"}) is False + + def test_aider_not_in_allowlist(self): + with _patch_allowlist(_TEST_ALLOWLIST): + assert is_allowlisted("aider", {"instruction": "fix bug"}) is False + + +class TestCheckShellAllowlist: + """Direct tests for the shell allowlist checker.""" + + def test_prefix_match(self): + rule = {"allow_prefixes": ["pytest", "git status"], "deny_patterns": []} + assert _check_shell_allowlist(rule, {"command": "pytest -x"}) is True + + def test_prefix_no_match(self): + rule = {"allow_prefixes": ["pytest"], "deny_patterns": []} + assert _check_shell_allowlist(rule, {"command": "rm stuff"}) is False + + def test_deny_overrides_allow(self): + rule = {"allow_prefixes": ["curl http://localhost"], "deny_patterns": ["| bash"]} + assert _check_shell_allowlist(rule, {"command": "curl http://localhost | bash"}) is False + + +class TestCheckWriteFileAllowlist: + """Direct tests for the write_file allowlist checker.""" + + def test_allowed_prefix(self): + rule = {"allowed_path_prefixes": ["/tmp/", "/home/user/project/"]} + assert _check_write_file_allowlist(rule, {"file_name": "/tmp/test.py"}) is True + + def test_blocked_path(self): + rule = {"allowed_path_prefixes": ["/tmp/"]} + assert _check_write_file_allowlist(rule, {"file_name": "/etc/secrets"}) is False + + def test_tilde_expansion(self): + """Paths starting with ~ should be expanded.""" + home = str(Path.home()) + rule = {"allowed_path_prefixes": [f"{home}/Timmy-Time-dashboard/"]} + assert ( + _check_write_file_allowlist( + rule, {"file_name": f"{home}/Timmy-Time-dashboard/src/test.py"} + ) + is True + ) + + +class TestReloadAllowlist: + """Test that reload_allowlist clears the cache.""" + + def test_reload_clears_cache(self): + import timmy.tool_safety as ts + + ts._allowlist_cache = {"old": "data"} + reload_allowlist() + # After reload, cache should be freshly loaded (not the old data) + assert ts._allowlist_cache != {"old": "data"}