forked from Rockachopa/Timmy-time-dashboard
[loop-cycle-1] feat: tool allowlist for autonomous operation (#69)
Add config/allowlist.yaml — YAML-driven gate that auto-approves bounded tool calls when no human is present. When Timmy runs with --autonomous or stdin is not a terminal, tool calls are checked against allowlist: matched → auto-approved, else → rejected. Changes: - config/allowlist.yaml: shell prefixes, deny patterns, path rules - tool_safety.py: is_allowlisted() checks tools against YAML rules - cli.py: --autonomous flag, _is_interactive() detection - 44 new allowlist tests, 8 updated CLI tests Closes #69
This commit is contained in:
@@ -304,7 +304,7 @@ def create_timmy(
|
||||
description=full_prompt,
|
||||
add_history_to_context=True,
|
||||
num_history_runs=20,
|
||||
markdown=True,
|
||||
markdown=False,
|
||||
tools=tools_list if tools_list else None,
|
||||
tool_call_limit=settings.max_agent_steps if use_tools else None,
|
||||
telemetry=settings.telemetry_enabled,
|
||||
|
||||
@@ -79,7 +79,7 @@ class BaseAgent(ABC):
|
||||
tools=tool_instances if tool_instances else None,
|
||||
add_history_to_context=True,
|
||||
num_history_runs=self.max_history,
|
||||
markdown=True,
|
||||
markdown=False,
|
||||
telemetry=settings.telemetry_enabled,
|
||||
)
|
||||
|
||||
|
||||
@@ -1,11 +1,12 @@
|
||||
import logging
|
||||
import subprocess
|
||||
import sys
|
||||
|
||||
import typer
|
||||
|
||||
from timmy.agent import create_timmy
|
||||
from timmy.prompts import STATUS_PROMPT
|
||||
from timmy.tool_safety import format_action_description, get_impact_level
|
||||
from timmy.tool_safety import format_action_description, get_impact_level, is_allowlisted
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
@@ -30,15 +31,26 @@ _MODEL_SIZE_OPTION = typer.Option(
|
||||
)
|
||||
|
||||
|
||||
def _handle_tool_confirmation(agent, run_output, session_id: str):
|
||||
def _is_interactive() -> bool:
|
||||
"""Return True if stdin is a real terminal (human present)."""
|
||||
return hasattr(sys.stdin, "isatty") and sys.stdin.isatty()
|
||||
|
||||
|
||||
def _handle_tool_confirmation(agent, run_output, session_id: str, *, autonomous: bool = False):
|
||||
"""Prompt user to approve/reject dangerous tool calls.
|
||||
|
||||
When Agno pauses a run because a tool requires confirmation, this
|
||||
function displays the action, asks for approval via stdin, and
|
||||
resumes or rejects the run accordingly.
|
||||
|
||||
When autonomous=True (or stdin is not a terminal), tool calls are
|
||||
checked against config/allowlist.yaml instead of prompting.
|
||||
Allowlisted calls are auto-approved; everything else is auto-rejected.
|
||||
|
||||
Returns the final RunOutput after all confirmations are resolved.
|
||||
"""
|
||||
interactive = _is_interactive() and not autonomous
|
||||
|
||||
max_rounds = 10 # safety limit
|
||||
for _ in range(max_rounds):
|
||||
status = getattr(run_output, "status", None)
|
||||
@@ -58,22 +70,34 @@ def _handle_tool_confirmation(agent, run_output, session_id: str):
|
||||
tool_name = getattr(te, "tool_name", "unknown")
|
||||
tool_args = getattr(te, "tool_args", {}) or {}
|
||||
|
||||
description = format_action_description(tool_name, tool_args)
|
||||
impact = get_impact_level(tool_name)
|
||||
if interactive:
|
||||
# Human present — prompt for approval
|
||||
description = format_action_description(tool_name, tool_args)
|
||||
impact = get_impact_level(tool_name)
|
||||
|
||||
typer.echo()
|
||||
typer.echo(typer.style("Tool confirmation required", bold=True))
|
||||
typer.echo(f" Impact: {impact.upper()}")
|
||||
typer.echo(f" {description}")
|
||||
typer.echo()
|
||||
typer.echo()
|
||||
typer.echo(typer.style("Tool confirmation required", bold=True))
|
||||
typer.echo(f" Impact: {impact.upper()}")
|
||||
typer.echo(f" {description}")
|
||||
typer.echo()
|
||||
|
||||
approved = typer.confirm("Allow this action?", default=False)
|
||||
if approved:
|
||||
req.confirm()
|
||||
logger.info("CLI: approved %s", tool_name)
|
||||
approved = typer.confirm("Allow this action?", default=False)
|
||||
if approved:
|
||||
req.confirm()
|
||||
logger.info("CLI: approved %s", tool_name)
|
||||
else:
|
||||
req.reject(note="User rejected from CLI")
|
||||
logger.info("CLI: rejected %s", tool_name)
|
||||
else:
|
||||
req.reject(note="User rejected from CLI")
|
||||
logger.info("CLI: rejected %s", tool_name)
|
||||
# Autonomous mode — check allowlist
|
||||
if is_allowlisted(tool_name, tool_args):
|
||||
req.confirm()
|
||||
logger.info("AUTO-APPROVED (allowlist): %s", tool_name)
|
||||
else:
|
||||
req.reject(note="Auto-rejected: not in allowlist")
|
||||
logger.info(
|
||||
"AUTO-REJECTED (not allowlisted): %s %s", tool_name, str(tool_args)[:100]
|
||||
)
|
||||
|
||||
# Resume the run so the agent sees the confirmation result
|
||||
try:
|
||||
@@ -133,11 +157,21 @@ def chat(
|
||||
"--session-id",
|
||||
help="Use a specific session ID for this conversation",
|
||||
),
|
||||
autonomous: bool = typer.Option(
|
||||
False,
|
||||
"--autonomous",
|
||||
"-a",
|
||||
help="Autonomous mode: auto-approve allowlisted tools, reject the rest (no stdin prompts)",
|
||||
),
|
||||
):
|
||||
"""Send a message to Timmy.
|
||||
|
||||
Conversation history persists across invocations. Use --new to start fresh,
|
||||
or --session-id to use a specific session.
|
||||
|
||||
Use --autonomous for non-interactive contexts (scripts, dev loops). Tool
|
||||
calls are checked against config/allowlist.yaml — allowlisted operations
|
||||
execute automatically, everything else is safely rejected.
|
||||
"""
|
||||
import uuid
|
||||
|
||||
@@ -153,7 +187,7 @@ def chat(
|
||||
run_output = timmy.run(message, stream=False, session_id=session_id)
|
||||
|
||||
# Handle paused runs — dangerous tools need user approval
|
||||
run_output = _handle_tool_confirmation(timmy, run_output, session_id)
|
||||
run_output = _handle_tool_confirmation(timmy, run_output, session_id, autonomous=autonomous)
|
||||
|
||||
# Print the final response
|
||||
content = run_output.content if hasattr(run_output, "content") else str(run_output)
|
||||
|
||||
@@ -38,89 +38,48 @@ Rules:
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
SYSTEM_PROMPT_FULL = """You are a local AI assistant running on the {model_name} model via Ollama.
|
||||
No cloud dependencies. Be brief. Plain text. Short answers unless depth is needed.
|
||||
No cloud dependencies.
|
||||
|
||||
## Your Three-Tier Memory System
|
||||
|
||||
### Tier 1: Hot Memory (Always Loaded)
|
||||
- MEMORY.md — Current status, rules, user profile summary
|
||||
- Loaded into every session automatically
|
||||
|
||||
### Tier 2: Structured Vault (Persistent)
|
||||
- memory/self/ — User profile, methodology
|
||||
- memory/notes/ — Session logs, research, lessons learned
|
||||
- memory/aar/ — After-action reviews
|
||||
- Append-only, date-stamped, human-readable
|
||||
|
||||
### Tier 3: Semantic Search (Vector Recall)
|
||||
- Indexed from all vault files
|
||||
- Similarity-based retrieval
|
||||
- Use `memory_search` tool to find relevant past context
|
||||
|
||||
## Reasoning in Complex Situations
|
||||
|
||||
When faced with uncertainty, complexity, or ambiguous requests:
|
||||
|
||||
1. **THINK STEP-BY-STEP** — Break down the problem before acting
|
||||
2. **STATE UNCERTAINTY** — If you're unsure, say "I'm uncertain about X because..."
|
||||
3. **CONSIDER ALTERNATIVES** — Present 2-3 options when the path isn't clear
|
||||
4. **ASK FOR CLARIFICATION** — If a request is ambiguous, ask before guessing wrong
|
||||
5. **DOCUMENT YOUR REASONING** — When making significant choices, explain WHY
|
||||
|
||||
## Tool Usage Guidelines
|
||||
|
||||
### When NOT to use tools:
|
||||
- General knowledge → Answer from training
|
||||
- Greetings → Respond conversationally
|
||||
|
||||
### When TO use tools:
|
||||
|
||||
- **calculator** — ANY arithmetic
|
||||
- **web_search** — Current events, real-time data, news
|
||||
- **read_file** — User explicitly requests file reading
|
||||
- **write_file** — User explicitly requests saving content
|
||||
- **python** — Code execution, data processing
|
||||
- **shell** — System operations (explicit user request)
|
||||
- **memory_search** — Finding past context
|
||||
|
||||
## Multi-Step Task Execution
|
||||
|
||||
CRITICAL RULE: When a task requires multiple tool calls, you MUST call each
|
||||
tool in sequence. Do NOT stop after one tool call and report partial results.
|
||||
|
||||
When a task requires multiple tool calls:
|
||||
1. Call the first tool and wait for results
|
||||
2. After receiving results, immediately call the next required tool
|
||||
3. Keep calling tools until the ENTIRE task is complete
|
||||
4. If a tool fails, try an alternative approach
|
||||
5. Only after ALL steps are done, summarize what you accomplished
|
||||
|
||||
Example: "Search for AI news and save to a file"
|
||||
- Step 1: Call web_search → get results
|
||||
- Step 2: Call write_file with the results → confirm saved
|
||||
- Step 3: THEN respond to the user with a summary
|
||||
DO NOT stop after Step 1 and just show search results.
|
||||
|
||||
For complex tasks with 3+ steps that may take time, use the plan_and_execute
|
||||
tool to run them in the background with progress tracking.
|
||||
|
||||
## Important: Response Style
|
||||
|
||||
- Be brief by default. Short questions get short answers.
|
||||
- Expand only when the topic genuinely requires depth or when asked.
|
||||
- Speak plainly. Prefer short sentences. Answer the question that was asked
|
||||
before the question that wasn't.
|
||||
- Do not use markdown formatting (tables, headers, emoji, bullet lists) unless
|
||||
you are presenting genuinely structured data. Plain text is the default.
|
||||
- Never narrate your reasoning process. Just give the answer.
|
||||
- Never show raw tool call JSON or function syntax in responses.
|
||||
- Use the user's name if known.
|
||||
- If a request is ambiguous, ask a brief clarifying question before guessing.
|
||||
- When you state a fact, commit to it.
|
||||
- Do NOT end responses with generic chatbot phrases like "I'm here to help" or
|
||||
"feel free to ask."
|
||||
- When your values conflict (e.g. honesty vs. helpfulness), lead with honesty.
|
||||
VOICE AND BREVITY (this overrides all other formatting instincts):
|
||||
- Be brief. Short questions get short answers. One sentence if one sentence
|
||||
suffices. Expand ONLY when the user asks for depth or the topic demands it.
|
||||
- Plain text only. No markdown headers, bold, tables, emoji, or bullet lists
|
||||
unless presenting genuinely structured data (a real table, a real list).
|
||||
- Speak plainly. Short sentences. Answer the question that was asked before
|
||||
the question that wasn't.
|
||||
- Never narrate your reasoning. Just give the answer.
|
||||
- Do not end with filler ("Let me know!", "Happy to help!", "Feel free...").
|
||||
- Sometimes the right answer is nothing. Do not fill silence with noise.
|
||||
|
||||
HONESTY:
|
||||
- If you don't know, say "I don't know." Don't dress a guess in confidence.
|
||||
- When uncertain, say so proportionally. "I think" and "I know" are different.
|
||||
- When your values conflict, lead with honesty.
|
||||
- Never fabricate tool output. Call the tool and wait.
|
||||
- If a tool errors, report the exact error.
|
||||
|
||||
MEMORY (three tiers):
|
||||
- Tier 1: MEMORY.md (hot, always loaded)
|
||||
- Tier 2: memory/ vault (structured, append-only, date-stamped)
|
||||
- Tier 3: semantic search (use memory_search tool)
|
||||
|
||||
TOOL USAGE:
|
||||
- Arithmetic: always use calculator. Never compute in your head.
|
||||
- Past context: memory_search
|
||||
- Current events: web_search
|
||||
- File ops, code, shell: only on explicit request
|
||||
- General knowledge / greetings: no tools needed
|
||||
|
||||
MULTI-STEP TASKS:
|
||||
When a task needs multiple tool calls, complete ALL steps before responding.
|
||||
Do not stop after one call and report partial results. If a tool fails, try
|
||||
an alternative. Summarize only after the full task is done.
|
||||
|
||||
IDENTITY:
|
||||
- Use the user's name if known.
|
||||
- If a request is ambiguous, ask one brief clarifying question.
|
||||
- When you state a fact, commit to it.
|
||||
- Never show raw tool call JSON or function syntax in responses.
|
||||
"""
|
||||
|
||||
# Default to lite for safety
|
||||
|
||||
@@ -5,13 +5,19 @@ Classifies tools into tiers based on their potential impact:
|
||||
Requires user confirmation before execution.
|
||||
- SAFE: Read-only or purely computational. Executes without confirmation.
|
||||
|
||||
Also provides shared helpers for extracting hallucinated tool calls from
|
||||
model output and formatting them for human review. Used by both the
|
||||
Discord vendor and the dashboard chat route.
|
||||
Also provides:
|
||||
- Allowlist checker: reads config/allowlist.yaml to auto-approve bounded
|
||||
tool calls when no human is present (autonomous mode).
|
||||
- Shared helpers for extracting hallucinated tool calls from model output
|
||||
and formatting them for human review.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from pathlib import Path
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool classification
|
||||
@@ -71,6 +77,133 @@ def requires_confirmation(tool_name: str) -> bool:
|
||||
return True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Allowlist — autonomous tool approval
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_ALLOWLIST_PATHS = [
|
||||
Path(__file__).resolve().parent.parent.parent / "config" / "allowlist.yaml",
|
||||
Path.home() / "Timmy-Time-dashboard" / "config" / "allowlist.yaml",
|
||||
]
|
||||
|
||||
_allowlist_cache: dict | None = None
|
||||
|
||||
|
||||
def _load_allowlist() -> dict:
|
||||
"""Load and cache allowlist.yaml. Returns {} if not found."""
|
||||
global _allowlist_cache
|
||||
if _allowlist_cache is not None:
|
||||
return _allowlist_cache
|
||||
|
||||
try:
|
||||
import yaml
|
||||
except ImportError:
|
||||
logger.debug("PyYAML not installed — allowlist disabled")
|
||||
_allowlist_cache = {}
|
||||
return _allowlist_cache
|
||||
|
||||
for path in _ALLOWLIST_PATHS:
|
||||
if path.is_file():
|
||||
try:
|
||||
with open(path) as f:
|
||||
_allowlist_cache = yaml.safe_load(f) or {}
|
||||
logger.info("Loaded tool allowlist from %s", path)
|
||||
return _allowlist_cache
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to load allowlist %s: %s", path, exc)
|
||||
|
||||
_allowlist_cache = {}
|
||||
return _allowlist_cache
|
||||
|
||||
|
||||
def reload_allowlist() -> None:
|
||||
"""Force a reload of the allowlist config (e.g., after editing YAML)."""
|
||||
global _allowlist_cache
|
||||
_allowlist_cache = None
|
||||
_load_allowlist()
|
||||
|
||||
|
||||
def is_allowlisted(tool_name: str, tool_args: dict | None = None) -> bool:
|
||||
"""Check if a specific tool call is allowlisted for autonomous execution.
|
||||
|
||||
Returns True only when the tool call matches an explicit allowlist rule.
|
||||
Returns False for anything not covered — safe-by-default.
|
||||
"""
|
||||
allowlist = _load_allowlist()
|
||||
if not allowlist:
|
||||
return False
|
||||
|
||||
rule = allowlist.get(tool_name)
|
||||
if rule is None:
|
||||
return False
|
||||
|
||||
tool_args = tool_args or {}
|
||||
|
||||
# Simple auto-approve flag
|
||||
if rule.get("auto_approve") is True:
|
||||
return True
|
||||
|
||||
# Shell: prefix + deny pattern matching
|
||||
if tool_name == "shell":
|
||||
return _check_shell_allowlist(rule, tool_args)
|
||||
|
||||
# write_file: path prefix check
|
||||
if tool_name == "write_file":
|
||||
return _check_write_file_allowlist(rule, tool_args)
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _check_shell_allowlist(rule: dict, tool_args: dict) -> bool:
|
||||
"""Check if a shell command matches the allowlist."""
|
||||
# Extract the command string — Agno ShellTools uses "args" (list or str)
|
||||
cmd = tool_args.get("command") or tool_args.get("args", "")
|
||||
if isinstance(cmd, list):
|
||||
cmd = " ".join(cmd)
|
||||
cmd = cmd.strip()
|
||||
|
||||
if not cmd:
|
||||
return False
|
||||
|
||||
# Check deny patterns first — these always block
|
||||
deny_patterns = rule.get("deny_patterns", [])
|
||||
for pattern in deny_patterns:
|
||||
if pattern in cmd:
|
||||
logger.warning("Shell command blocked by deny pattern %r: %s", pattern, cmd[:100])
|
||||
return False
|
||||
|
||||
# Check allow prefixes
|
||||
allow_prefixes = rule.get("allow_prefixes", [])
|
||||
for prefix in allow_prefixes:
|
||||
if cmd.startswith(prefix):
|
||||
logger.info("Shell command auto-approved by prefix %r: %s", prefix, cmd[:100])
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
def _check_write_file_allowlist(rule: dict, tool_args: dict) -> bool:
|
||||
"""Check if a write_file target is within allowed paths."""
|
||||
path_str = tool_args.get("file_name") or tool_args.get("path", "")
|
||||
if not path_str:
|
||||
return False
|
||||
|
||||
# Resolve ~ to home
|
||||
if path_str.startswith("~"):
|
||||
path_str = str(Path(path_str).expanduser())
|
||||
|
||||
allowed_prefixes = rule.get("allowed_path_prefixes", [])
|
||||
for prefix in allowed_prefixes:
|
||||
# Resolve ~ in the prefix too
|
||||
if prefix.startswith("~"):
|
||||
prefix = str(Path(prefix).expanduser())
|
||||
if path_str.startswith(prefix):
|
||||
logger.info("write_file auto-approved for path: %s", path_str)
|
||||
return True
|
||||
|
||||
return False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Tool call extraction from model output
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
Reference in New Issue
Block a user