When a dangerous command is detected and the user is prompted for approval, long commands are truncated (80 chars in fallback, 70 chars in the TUI). Users had no way to see the full command before deciding. This adds a 'View full command' option across all approval interfaces: - CLI fallback (tools/approval.py): [v]iew option in the prompt menu. Shows the full command and re-prompts for approval decision. - CLI TUI (cli.py): 'Show full command' choice in the arrow-key selection panel. Expands the command display in-place and removes the view option after use. - CLI callbacks (callbacks.py): 'view' choice added to the list when the command exceeds 70 characters. - Gateway (gateway/run.py): 'full', 'show', 'view' responses reveal the complete command while keeping the approval pending. Includes 7 new tests covering view-then-approve, view-then-deny, short command fallthrough, and double-view behavior. Closes community feedback about the 80-char cap on dangerous commands.
314 lines
11 KiB
Python
314 lines
11 KiB
Python
"""Dangerous command approval -- detection, prompting, and per-session state.
|
|
|
|
This module is the single source of truth for the dangerous command system:
|
|
- Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
|
|
- Per-session approval state (thread-safe, keyed by session_key)
|
|
- Approval prompting (CLI interactive + gateway async)
|
|
- Permanent allowlist persistence (config.yaml)
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# =========================================================================
|
|
# Dangerous command patterns
|
|
# =========================================================================
|
|
|
|
DANGEROUS_PATTERNS = [
|
|
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
|
|
(r'\brm\s+-[^\s]*r', "recursive delete"),
|
|
(r'\brm\s+--recursive\b', "recursive delete (long flag)"),
|
|
(r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"),
|
|
(r'\bchmod\s+--recursive\b.*777', "recursive world-writable (long flag)"),
|
|
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
|
|
(r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
|
|
(r'\bmkfs\b', "format filesystem"),
|
|
(r'\bdd\s+.*if=', "disk copy"),
|
|
(r'>\s*/dev/sd', "write to block device"),
|
|
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
|
|
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
|
|
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
|
|
(r'>\s*/etc/', "overwrite system config"),
|
|
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
|
|
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
|
|
(r'\bpkill\s+-9\b', "force kill processes"),
|
|
(r':()\s*{\s*:\s*\|\s*:&\s*}\s*;:', "fork bomb"),
|
|
(r'\b(bash|sh|zsh)\s+-c\s+', "shell command via -c flag"),
|
|
(r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
|
|
(r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
|
|
(r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"),
|
|
(r'\btee\b.*(/etc/|/dev/sd|\.ssh/|\.hermes/\.env)', "overwrite system file via tee"),
|
|
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
|
|
(r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"),
|
|
(r'\bfind\b.*-delete\b', "find -delete"),
|
|
]
|
|
|
|
|
|
# =========================================================================
|
|
# Detection
|
|
# =========================================================================
|
|
|
|
def detect_dangerous_command(command: str) -> tuple:
|
|
"""Check if a command matches any dangerous patterns.
|
|
|
|
Returns:
|
|
(is_dangerous, pattern_key, description) or (False, None, None)
|
|
"""
|
|
command_lower = command.lower()
|
|
for pattern, description in DANGEROUS_PATTERNS:
|
|
if re.search(pattern, command_lower, re.IGNORECASE | re.DOTALL):
|
|
pattern_key = pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
|
return (True, pattern_key, description)
|
|
return (False, None, None)
|
|
|
|
|
|
# =========================================================================
|
|
# Per-session approval state (thread-safe)
|
|
# =========================================================================
|
|
|
|
_lock = threading.Lock()
|
|
_pending: dict[str, dict] = {}
|
|
_session_approved: dict[str, set] = {}
|
|
_permanent_approved: set = set()
|
|
|
|
|
|
def submit_pending(session_key: str, approval: dict):
|
|
"""Store a pending approval request for a session."""
|
|
with _lock:
|
|
_pending[session_key] = approval
|
|
|
|
|
|
def pop_pending(session_key: str) -> Optional[dict]:
|
|
"""Retrieve and remove a pending approval for a session."""
|
|
with _lock:
|
|
return _pending.pop(session_key, None)
|
|
|
|
|
|
def has_pending(session_key: str) -> bool:
|
|
"""Check if a session has a pending approval request."""
|
|
with _lock:
|
|
return session_key in _pending
|
|
|
|
|
|
def approve_session(session_key: str, pattern_key: str):
|
|
"""Approve a pattern for this session only."""
|
|
with _lock:
|
|
_session_approved.setdefault(session_key, set()).add(pattern_key)
|
|
|
|
|
|
def is_approved(session_key: str, pattern_key: str) -> bool:
|
|
"""Check if a pattern is approved (session-scoped or permanent)."""
|
|
with _lock:
|
|
if pattern_key in _permanent_approved:
|
|
return True
|
|
return pattern_key in _session_approved.get(session_key, set())
|
|
|
|
|
|
def approve_permanent(pattern_key: str):
|
|
"""Add a pattern to the permanent allowlist."""
|
|
with _lock:
|
|
_permanent_approved.add(pattern_key)
|
|
|
|
|
|
def load_permanent(patterns: set):
|
|
"""Bulk-load permanent allowlist entries from config."""
|
|
with _lock:
|
|
_permanent_approved.update(patterns)
|
|
|
|
|
|
def clear_session(session_key: str):
|
|
"""Clear all approvals and pending requests for a session."""
|
|
with _lock:
|
|
_session_approved.pop(session_key, None)
|
|
_pending.pop(session_key, None)
|
|
|
|
|
|
# =========================================================================
|
|
# Config persistence for permanent allowlist
|
|
# =========================================================================
|
|
|
|
def load_permanent_allowlist() -> set:
|
|
"""Load permanently allowed command patterns from config.
|
|
|
|
Also syncs them into the approval module so is_approved() works for
|
|
patterns added via 'always' in a previous session.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
config = load_config()
|
|
patterns = set(config.get("command_allowlist", []) or [])
|
|
if patterns:
|
|
load_permanent(patterns)
|
|
return patterns
|
|
except Exception:
|
|
return set()
|
|
|
|
|
|
def save_permanent_allowlist(patterns: set):
|
|
"""Save permanently allowed command patterns to config."""
|
|
try:
|
|
from hermes_cli.config import load_config, save_config
|
|
config = load_config()
|
|
config["command_allowlist"] = list(patterns)
|
|
save_config(config)
|
|
except Exception as e:
|
|
logger.warning("Could not save allowlist: %s", e)
|
|
|
|
|
|
# =========================================================================
|
|
# Approval prompting + orchestration
|
|
# =========================================================================
|
|
|
|
def prompt_dangerous_approval(command: str, description: str,
|
|
timeout_seconds: int = 60,
|
|
approval_callback=None) -> str:
|
|
"""Prompt the user to approve a dangerous command (CLI only).
|
|
|
|
Args:
|
|
approval_callback: Optional callback registered by the CLI for
|
|
prompt_toolkit integration. Signature: (command, description) -> str.
|
|
|
|
Returns: 'once', 'session', 'always', or 'deny'
|
|
"""
|
|
if approval_callback is not None:
|
|
try:
|
|
return approval_callback(command, description)
|
|
except Exception:
|
|
return "deny"
|
|
|
|
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
|
try:
|
|
is_truncated = len(command) > 80
|
|
while True:
|
|
print()
|
|
print(f" ⚠️ DANGEROUS COMMAND: {description}")
|
|
print(f" {command[:80]}{'...' if is_truncated else ''}")
|
|
print()
|
|
view_hint = " | [v]iew full" if is_truncated else ""
|
|
print(f" [o]nce | [s]ession | [a]lways | [d]eny{view_hint}")
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
result = {"choice": ""}
|
|
|
|
def get_input():
|
|
try:
|
|
result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower()
|
|
except (EOFError, OSError):
|
|
result["choice"] = ""
|
|
|
|
thread = threading.Thread(target=get_input, daemon=True)
|
|
thread.start()
|
|
thread.join(timeout=timeout_seconds)
|
|
|
|
if thread.is_alive():
|
|
print("\n ⏱ Timeout - denying command")
|
|
return "deny"
|
|
|
|
choice = result["choice"]
|
|
if choice in ('v', 'view') and is_truncated:
|
|
print()
|
|
print(" Full command:")
|
|
print(f" {command}")
|
|
is_truncated = False # show full on next loop iteration too
|
|
continue
|
|
if choice in ('o', 'once'):
|
|
print(" ✓ Allowed once")
|
|
return "once"
|
|
elif choice in ('s', 'session'):
|
|
print(" ✓ Allowed for this session")
|
|
return "session"
|
|
elif choice in ('a', 'always'):
|
|
print(" ✓ Added to permanent allowlist")
|
|
return "always"
|
|
else:
|
|
print(" ✗ Denied")
|
|
return "deny"
|
|
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\n ✗ Cancelled")
|
|
return "deny"
|
|
finally:
|
|
if "HERMES_SPINNER_PAUSE" in os.environ:
|
|
del os.environ["HERMES_SPINNER_PAUSE"]
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
|
|
def check_dangerous_command(command: str, env_type: str,
|
|
approval_callback=None) -> dict:
|
|
"""Check if a command is dangerous and handle approval.
|
|
|
|
This is the main entry point called by terminal_tool before executing
|
|
any command. It orchestrates detection, session checks, and prompting.
|
|
|
|
Args:
|
|
command: The shell command to check.
|
|
env_type: Terminal backend type ('local', 'ssh', 'docker', etc.).
|
|
approval_callback: Optional CLI callback for interactive prompts.
|
|
|
|
Returns:
|
|
{"approved": True/False, "message": str or None, ...}
|
|
"""
|
|
if env_type in ("docker", "singularity", "modal", "daytona"):
|
|
return {"approved": True, "message": None}
|
|
|
|
# --yolo: bypass all approval prompts
|
|
if os.getenv("HERMES_YOLO_MODE"):
|
|
return {"approved": True, "message": None}
|
|
|
|
is_dangerous, pattern_key, description = detect_dangerous_command(command)
|
|
if not is_dangerous:
|
|
return {"approved": True, "message": None}
|
|
|
|
session_key = os.getenv("HERMES_SESSION_KEY", "default")
|
|
if is_approved(session_key, pattern_key):
|
|
return {"approved": True, "message": None}
|
|
|
|
is_cli = os.getenv("HERMES_INTERACTIVE")
|
|
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
|
|
|
if not is_cli and not is_gateway:
|
|
return {"approved": True, "message": None}
|
|
|
|
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
|
|
submit_pending(session_key, {
|
|
"command": command,
|
|
"pattern_key": pattern_key,
|
|
"description": description,
|
|
})
|
|
return {
|
|
"approved": False,
|
|
"pattern_key": pattern_key,
|
|
"status": "approval_required",
|
|
"command": command,
|
|
"description": description,
|
|
"message": f"⚠️ This command is potentially dangerous ({description}). Asking the user for approval...",
|
|
}
|
|
|
|
choice = prompt_dangerous_approval(command, description,
|
|
approval_callback=approval_callback)
|
|
|
|
if choice == "deny":
|
|
return {
|
|
"approved": False,
|
|
"message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.",
|
|
"pattern_key": pattern_key,
|
|
"description": description,
|
|
}
|
|
|
|
if choice == "session":
|
|
approve_session(session_key, pattern_key)
|
|
elif choice == "always":
|
|
approve_session(session_key, pattern_key)
|
|
approve_permanent(pattern_key)
|
|
save_permanent_allowlist(_permanent_approved)
|
|
|
|
return {"approved": True, "message": None}
|