Refactor terminal tool command approval process and enhance CLI feedback
- Updated the terminal tool's command approval flow to improve user interaction when executing potentially dangerous commands, replacing the previous confirmation method with a clear explanation and instructions for adding commands to the allowlist. - Removed the internal `force` parameter from the model API, ensuring that dangerous command approvals are handled solely through user prompts. - Enhanced the CLI to provide better feedback regarding tool availability, including improved messaging for enabled and disabled toolsets. - Updated AGENTS.md to reflect changes in the command approval process and configuration instructions.
This commit is contained in:
@@ -281,8 +281,8 @@ The terminal tool includes safety checks for potentially destructive commands (e
|
|||||||
|
|
||||||
**Approval Flow (Messaging):**
|
**Approval Flow (Messaging):**
|
||||||
- Command is blocked with explanation
|
- Command is blocked with explanation
|
||||||
- Agent explains and asks user to confirm
|
- Agent explains the command was blocked for safety
|
||||||
- If user says "yes/approve/do it", agent retries with `force=True`
|
- User must add the pattern to their allowlist via `hermes config edit` or run the command directly on their machine
|
||||||
|
|
||||||
**Configuration:**
|
**Configuration:**
|
||||||
- `command_allowlist` in `~/.hermes/config.yaml` stores permanently allowed patterns
|
- `command_allowlist` in `~/.hermes/config.yaml` stores permanently allowed patterns
|
||||||
|
|||||||
89
cli.py
89
cli.py
@@ -126,8 +126,20 @@ def load_cli_config() -> Dict[str, Any]:
|
|||||||
try:
|
try:
|
||||||
with open(config_path, "r") as f:
|
with open(config_path, "r") as f:
|
||||||
file_config = yaml.safe_load(f) or {}
|
file_config = yaml.safe_load(f) or {}
|
||||||
# Deep merge with defaults
|
|
||||||
|
# Handle model config - can be string (new format) or dict (old format)
|
||||||
|
if "model" in file_config:
|
||||||
|
if isinstance(file_config["model"], str):
|
||||||
|
# New format: model is just a string, convert to dict structure
|
||||||
|
defaults["model"]["default"] = file_config["model"]
|
||||||
|
elif isinstance(file_config["model"], dict):
|
||||||
|
# Old format: model is a dict with default/base_url
|
||||||
|
defaults["model"].update(file_config["model"])
|
||||||
|
|
||||||
|
# Deep merge other keys with defaults
|
||||||
for key in defaults:
|
for key in defaults:
|
||||||
|
if key == "model":
|
||||||
|
continue # Already handled above
|
||||||
if key in file_config:
|
if key in file_config:
|
||||||
if isinstance(defaults[key], dict) and isinstance(file_config[key], dict):
|
if isinstance(defaults[key], dict) and isinstance(file_config[key], dict):
|
||||||
defaults[key].update(file_config[key])
|
defaults[key].update(file_config[key])
|
||||||
@@ -306,9 +318,17 @@ def build_welcome_banner(console: Console, model: str, cwd: str, tools: List[dic
|
|||||||
enabled_toolsets: List of enabled toolset names
|
enabled_toolsets: List of enabled toolset names
|
||||||
session_id: Unique session identifier for logging
|
session_id: Unique session identifier for logging
|
||||||
"""
|
"""
|
||||||
|
from model_tools import check_tool_availability, TOOLSET_REQUIREMENTS
|
||||||
|
|
||||||
tools = tools or []
|
tools = tools or []
|
||||||
enabled_toolsets = enabled_toolsets or []
|
enabled_toolsets = enabled_toolsets or []
|
||||||
|
|
||||||
|
# Get unavailable tools info for coloring
|
||||||
|
_, unavailable_toolsets = check_tool_availability(quiet=True)
|
||||||
|
disabled_tools = set()
|
||||||
|
for item in unavailable_toolsets:
|
||||||
|
disabled_tools.update(item.get("tools", []))
|
||||||
|
|
||||||
# Build the side-by-side content using a table for precise control
|
# Build the side-by-side content using a table for precise control
|
||||||
layout_table = Table.grid(padding=(0, 2))
|
layout_table = Table.grid(padding=(0, 2))
|
||||||
layout_table.add_column("left", justify="center")
|
layout_table.add_column("left", justify="center")
|
||||||
@@ -334,8 +354,10 @@ def build_welcome_banner(console: Console, model: str, cwd: str, tools: List[dic
|
|||||||
right_lines = []
|
right_lines = []
|
||||||
right_lines.append("[bold #FFBF00]Available Tools[/]")
|
right_lines.append("[bold #FFBF00]Available Tools[/]")
|
||||||
|
|
||||||
# Group tools by toolset
|
# Group tools by toolset (include all possible tools, both enabled and disabled)
|
||||||
toolsets_dict = {}
|
toolsets_dict = {}
|
||||||
|
|
||||||
|
# First, add all enabled tools
|
||||||
for tool in tools:
|
for tool in tools:
|
||||||
tool_name = tool["function"]["name"]
|
tool_name = tool["function"]["name"]
|
||||||
toolset = get_toolset_for_tool(tool_name) or "other"
|
toolset = get_toolset_for_tool(tool_name) or "other"
|
||||||
@@ -343,6 +365,17 @@ def build_welcome_banner(console: Console, model: str, cwd: str, tools: List[dic
|
|||||||
toolsets_dict[toolset] = []
|
toolsets_dict[toolset] = []
|
||||||
toolsets_dict[toolset].append(tool_name)
|
toolsets_dict[toolset].append(tool_name)
|
||||||
|
|
||||||
|
# Also add disabled toolsets so they show in the banner
|
||||||
|
for item in unavailable_toolsets:
|
||||||
|
# Map the internal toolset ID to display name
|
||||||
|
toolset_id = item["id"]
|
||||||
|
display_name = f"{toolset_id}_tools" if not toolset_id.endswith("_tools") else toolset_id
|
||||||
|
if display_name not in toolsets_dict:
|
||||||
|
toolsets_dict[display_name] = []
|
||||||
|
for tool_name in item.get("tools", []):
|
||||||
|
if tool_name not in toolsets_dict[display_name]:
|
||||||
|
toolsets_dict[display_name].append(tool_name)
|
||||||
|
|
||||||
# Display tools grouped by toolset (compact format, max 8 groups)
|
# Display tools grouped by toolset (compact format, max 8 groups)
|
||||||
sorted_toolsets = sorted(toolsets_dict.keys())
|
sorted_toolsets = sorted(toolsets_dict.keys())
|
||||||
display_toolsets = sorted_toolsets[:8]
|
display_toolsets = sorted_toolsets[:8]
|
||||||
@@ -350,11 +383,38 @@ def build_welcome_banner(console: Console, model: str, cwd: str, tools: List[dic
|
|||||||
|
|
||||||
for toolset in display_toolsets:
|
for toolset in display_toolsets:
|
||||||
tool_names = toolsets_dict[toolset]
|
tool_names = toolsets_dict[toolset]
|
||||||
# Join tool names with commas, wrap if too long
|
# Color each tool name - red if disabled, normal if enabled
|
||||||
tools_str = ", ".join(sorted(tool_names))
|
colored_names = []
|
||||||
if len(tools_str) > 45:
|
for name in sorted(tool_names):
|
||||||
tools_str = tools_str[:42] + "..."
|
if name in disabled_tools:
|
||||||
right_lines.append(f"[dim #B8860B]{toolset}:[/] [#FFF8DC]{tools_str}[/]")
|
colored_names.append(f"[red]{name}[/]")
|
||||||
|
else:
|
||||||
|
colored_names.append(f"[#FFF8DC]{name}[/]")
|
||||||
|
|
||||||
|
tools_str = ", ".join(colored_names)
|
||||||
|
# Truncate if too long (accounting for markup)
|
||||||
|
if len(", ".join(sorted(tool_names))) > 45:
|
||||||
|
# Rebuild with truncation
|
||||||
|
short_names = []
|
||||||
|
length = 0
|
||||||
|
for name in sorted(tool_names):
|
||||||
|
if length + len(name) + 2 > 42:
|
||||||
|
short_names.append("...")
|
||||||
|
break
|
||||||
|
short_names.append(name)
|
||||||
|
length += len(name) + 2
|
||||||
|
# Re-color the truncated list
|
||||||
|
colored_names = []
|
||||||
|
for name in short_names:
|
||||||
|
if name == "...":
|
||||||
|
colored_names.append("[dim]...[/]")
|
||||||
|
elif name in disabled_tools:
|
||||||
|
colored_names.append(f"[red]{name}[/]")
|
||||||
|
else:
|
||||||
|
colored_names.append(f"[#FFF8DC]{name}[/]")
|
||||||
|
tools_str = ", ".join(colored_names)
|
||||||
|
|
||||||
|
right_lines.append(f"[dim #B8860B]{toolset}:[/] {tools_str}")
|
||||||
|
|
||||||
if remaining_toolsets > 0:
|
if remaining_toolsets > 0:
|
||||||
right_lines.append(f"[dim #B8860B](and {remaining_toolsets} more toolsets...)[/]")
|
right_lines.append(f"[dim #B8860B](and {remaining_toolsets} more toolsets...)[/]")
|
||||||
@@ -509,9 +569,14 @@ class HermesCLI:
|
|||||||
self.verbose = verbose if verbose is not None else CLI_CONFIG["agent"].get("verbose", False)
|
self.verbose = verbose if verbose is not None else CLI_CONFIG["agent"].get("verbose", False)
|
||||||
|
|
||||||
# Configuration - priority: CLI args > env vars > config file
|
# Configuration - priority: CLI args > env vars > config file
|
||||||
self.model = model or os.getenv("LLM_MODEL", CLI_CONFIG["model"]["default"])
|
# Model can come from: CLI arg, LLM_MODEL env, OPENAI_MODEL env (custom endpoint), or config
|
||||||
self.base_url = base_url or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
|
self.model = model or os.getenv("LLM_MODEL") or os.getenv("OPENAI_MODEL") or CLI_CONFIG["model"]["default"]
|
||||||
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
|
|
||||||
|
# Base URL: custom endpoint (OPENAI_BASE_URL) takes precedence over OpenRouter
|
||||||
|
self.base_url = base_url or os.getenv("OPENAI_BASE_URL") or os.getenv("OPENROUTER_BASE_URL", CLI_CONFIG["model"]["base_url"])
|
||||||
|
|
||||||
|
# API key: custom endpoint (OPENAI_API_KEY) takes precedence over OpenRouter
|
||||||
|
self.api_key = api_key or os.getenv("OPENAI_API_KEY") or os.getenv("OPENROUTER_API_KEY")
|
||||||
self.max_turns = max_turns if max_turns != 20 else CLI_CONFIG["agent"].get("max_turns", 20)
|
self.max_turns = max_turns if max_turns != 20 else CLI_CONFIG["agent"].get("max_turns", 20)
|
||||||
|
|
||||||
# Parse and validate toolsets
|
# Parse and validate toolsets
|
||||||
@@ -641,7 +706,7 @@ class HermesCLI:
|
|||||||
def _show_status(self):
|
def _show_status(self):
|
||||||
"""Show current status bar."""
|
"""Show current status bar."""
|
||||||
# Get tool count
|
# Get tool count
|
||||||
tools = get_tool_definitions(enabled_toolsets=self.enabled_toolsets)
|
tools = get_tool_definitions(enabled_toolsets=self.enabled_toolsets, quiet_mode=True)
|
||||||
tool_count = len(tools) if tools else 0
|
tool_count = len(tools) if tools else 0
|
||||||
|
|
||||||
# Format model name (shorten if needed)
|
# Format model name (shorten if needed)
|
||||||
@@ -684,7 +749,7 @@ class HermesCLI:
|
|||||||
|
|
||||||
def show_tools(self):
|
def show_tools(self):
|
||||||
"""Display available tools with kawaii ASCII art."""
|
"""Display available tools with kawaii ASCII art."""
|
||||||
tools = get_tool_definitions(enabled_toolsets=self.enabled_toolsets)
|
tools = get_tool_definitions(enabled_toolsets=self.enabled_toolsets, quiet_mode=True)
|
||||||
|
|
||||||
if not tools:
|
if not tools:
|
||||||
print("(;_;) No tools available")
|
print("(;_;) No tools available")
|
||||||
|
|||||||
@@ -274,11 +274,6 @@ def get_terminal_tool_definitions() -> List[Dict[str, Any]]:
|
|||||||
"type": "integer",
|
"type": "integer",
|
||||||
"description": "Command timeout in seconds (optional)",
|
"description": "Command timeout in seconds (optional)",
|
||||||
"minimum": 1
|
"minimum": 1
|
||||||
},
|
|
||||||
"force": {
|
|
||||||
"type": "boolean",
|
|
||||||
"description": "Skip dangerous command safety check. Only use after user explicitly confirms they want to run a blocked command.",
|
|
||||||
"default": False
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"required": ["command"]
|
"required": ["command"]
|
||||||
@@ -644,7 +639,8 @@ def get_tool_definitions(
|
|||||||
if validate_toolset(toolset_name):
|
if validate_toolset(toolset_name):
|
||||||
resolved_tools = resolve_toolset(toolset_name)
|
resolved_tools = resolve_toolset(toolset_name)
|
||||||
tools_to_include.update(resolved_tools)
|
tools_to_include.update(resolved_tools)
|
||||||
print(f"✅ Enabled toolset '{toolset_name}': {', '.join(resolved_tools) if resolved_tools else 'no tools'}")
|
if not quiet_mode:
|
||||||
|
print(f"✅ Enabled toolset '{toolset_name}': {', '.join(resolved_tools) if resolved_tools else 'no tools'}")
|
||||||
else:
|
else:
|
||||||
# Try legacy compatibility
|
# Try legacy compatibility
|
||||||
if toolset_name in ["web_tools", "terminal_tools", "vision_tools", "moa_tools", "image_tools", "skills_tools", "browser_tools", "cronjob_tools"]:
|
if toolset_name in ["web_tools", "terminal_tools", "vision_tools", "moa_tools", "image_tools", "skills_tools", "browser_tools", "cronjob_tools"]:
|
||||||
@@ -666,9 +662,11 @@ def get_tool_definitions(
|
|||||||
}
|
}
|
||||||
legacy_tools = legacy_map.get(toolset_name, [])
|
legacy_tools = legacy_map.get(toolset_name, [])
|
||||||
tools_to_include.update(legacy_tools)
|
tools_to_include.update(legacy_tools)
|
||||||
print(f"✅ Enabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
|
if not quiet_mode:
|
||||||
|
print(f"✅ Enabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
|
||||||
else:
|
else:
|
||||||
print(f"⚠️ Unknown toolset: {toolset_name}")
|
if not quiet_mode:
|
||||||
|
print(f"⚠️ Unknown toolset: {toolset_name}")
|
||||||
elif disabled_toolsets:
|
elif disabled_toolsets:
|
||||||
# Start with all tools from all toolsets, then remove disabled ones
|
# Start with all tools from all toolsets, then remove disabled ones
|
||||||
# Note: Only tools that are part of toolsets are accessible
|
# Note: Only tools that are part of toolsets are accessible
|
||||||
@@ -687,7 +685,8 @@ def get_tool_definitions(
|
|||||||
if validate_toolset(toolset_name):
|
if validate_toolset(toolset_name):
|
||||||
resolved_tools = resolve_toolset(toolset_name)
|
resolved_tools = resolve_toolset(toolset_name)
|
||||||
tools_to_include.difference_update(resolved_tools)
|
tools_to_include.difference_update(resolved_tools)
|
||||||
print(f"🚫 Disabled toolset '{toolset_name}': {', '.join(resolved_tools) if resolved_tools else 'no tools'}")
|
if not quiet_mode:
|
||||||
|
print(f"🚫 Disabled toolset '{toolset_name}': {', '.join(resolved_tools) if resolved_tools else 'no tools'}")
|
||||||
else:
|
else:
|
||||||
# Try legacy compatibility
|
# Try legacy compatibility
|
||||||
if toolset_name in ["web_tools", "terminal_tools", "vision_tools", "moa_tools", "image_tools", "skills_tools", "browser_tools", "cronjob_tools"]:
|
if toolset_name in ["web_tools", "terminal_tools", "vision_tools", "moa_tools", "image_tools", "skills_tools", "browser_tools", "cronjob_tools"]:
|
||||||
@@ -708,9 +707,11 @@ def get_tool_definitions(
|
|||||||
}
|
}
|
||||||
legacy_tools = legacy_map.get(toolset_name, [])
|
legacy_tools = legacy_map.get(toolset_name, [])
|
||||||
tools_to_include.difference_update(legacy_tools)
|
tools_to_include.difference_update(legacy_tools)
|
||||||
print(f"🚫 Disabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
|
if not quiet_mode:
|
||||||
|
print(f"🚫 Disabled legacy toolset '{toolset_name}': {', '.join(legacy_tools)}")
|
||||||
else:
|
else:
|
||||||
print(f"⚠️ Unknown toolset: {toolset_name}")
|
if not quiet_mode:
|
||||||
|
print(f"⚠️ Unknown toolset: {toolset_name}")
|
||||||
else:
|
else:
|
||||||
# No filtering - include all tools from all defined toolsets
|
# No filtering - include all tools from all defined toolsets
|
||||||
from toolsets import get_all_toolsets
|
from toolsets import get_all_toolsets
|
||||||
@@ -781,9 +782,10 @@ def handle_terminal_function_call(function_name: str, function_args: Dict[str, A
|
|||||||
command = function_args.get("command")
|
command = function_args.get("command")
|
||||||
background = function_args.get("background", False)
|
background = function_args.get("background", False)
|
||||||
timeout = function_args.get("timeout")
|
timeout = function_args.get("timeout")
|
||||||
force = function_args.get("force", False) # Skip dangerous command check if user confirmed
|
# Note: force parameter exists internally but is NOT exposed to the model
|
||||||
|
# Dangerous command approval is handled via user prompts only
|
||||||
|
|
||||||
return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id, force=force)
|
return terminal_tool(command=command, background=background, timeout=timeout, task_id=task_id)
|
||||||
|
|
||||||
else:
|
else:
|
||||||
return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False)
|
return json.dumps({"error": f"Unknown terminal function: {function_name}"}, ensure_ascii=False)
|
||||||
|
|||||||
@@ -300,11 +300,12 @@ def _prompt_dangerous_approval(command: str, description: str, timeout_seconds:
|
|||||||
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
||||||
|
|
||||||
try:
|
try:
|
||||||
|
# Use simple ASCII art for compatibility (no ANSI color codes)
|
||||||
print()
|
print()
|
||||||
print(f" ⚠️ \033[33mPotentially dangerous command detected:\033[0m {description}")
|
print(f" ⚠️ DANGEROUS COMMAND: {description}")
|
||||||
print(f" \033[2m{command[:100]}{'...' if len(command) > 100 else ''}\033[0m")
|
print(f" {command[:80]}{'...' if len(command) > 80 else ''}")
|
||||||
print()
|
print()
|
||||||
print(f" [\033[32mo\033[0m]nce | [\033[33ms\033[0m]ession | [\033[36ma\033[0m]lways | [\033[31md\033[0m]eny")
|
print(f" [o]nce | [s]ession | [a]lways | [d]eny")
|
||||||
print()
|
print()
|
||||||
sys.stdout.flush()
|
sys.stdout.flush()
|
||||||
|
|
||||||
@@ -389,14 +390,14 @@ def _check_dangerous_command(command: str, env_type: str) -> dict:
|
|||||||
return {
|
return {
|
||||||
"approved": False,
|
"approved": False,
|
||||||
"pattern_key": pattern_key,
|
"pattern_key": pattern_key,
|
||||||
"message": f"⚠️ This command was blocked because it's potentially dangerous ({description}). If you want me to run it anyway, please confirm by saying 'yes, run it' or 'approve'."
|
"message": f"BLOCKED: This command is potentially dangerous ({description}). Tell the user and ask if they want to add this command pattern to their allowlist. They can do this via 'hermes config edit' or by running the command directly on their machine."
|
||||||
}
|
}
|
||||||
|
|
||||||
# CLI context - prompt user
|
# CLI context - prompt user
|
||||||
choice = _prompt_dangerous_approval(command, description)
|
choice = _prompt_dangerous_approval(command, description)
|
||||||
|
|
||||||
if choice == "deny":
|
if choice == "deny":
|
||||||
return {"approved": False, "message": "Command denied by user"}
|
return {"approved": False, "message": "BLOCKED: User denied this potentially dangerous command. Do NOT retry this command - the user has explicitly rejected it."}
|
||||||
|
|
||||||
# Handle approval
|
# Handle approval
|
||||||
if choice == "session":
|
if choice == "session":
|
||||||
@@ -1304,7 +1305,7 @@ def terminal_tool(
|
|||||||
>>> result = terminal_tool(command="long_task.sh", timeout=300)
|
>>> result = terminal_tool(command="long_task.sh", timeout=300)
|
||||||
|
|
||||||
# Force run after user confirmation
|
# Force run after user confirmation
|
||||||
>>> result = terminal_tool(command="rm -rf /tmp/old", force=True)
|
# Note: force parameter is internal only, not exposed to model API
|
||||||
"""
|
"""
|
||||||
global _active_environments, _last_activity
|
global _active_environments, _last_activity
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user