"""Dangerous command approval -- detection, prompting, and per-session state. This module is the single source of truth for the dangerous command system: - Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command) - Per-session approval state (thread-safe, keyed by session_key) - Approval prompting (CLI interactive + gateway async) - Permanent allowlist persistence (config.yaml) """ import logging import os import re import sys import threading from typing import Optional logger = logging.getLogger(__name__) # ========================================================================= # Dangerous command patterns # ========================================================================= DANGEROUS_PATTERNS = [ (r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"), (r'\brm\s+-[^\s]*r', "recursive delete"), (r'\brm\s+--recursive\b', "recursive delete (long flag)"), (r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"), (r'\bchmod\s+--recursive\b.*777', "recursive world-writable (long flag)"), (r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"), (r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"), (r'\bmkfs\b', "format filesystem"), (r'\bdd\s+.*if=', "disk copy"), (r'>\s*/dev/sd', "write to block device"), (r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"), (r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"), (r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"), (r'>\s*/etc/', "overwrite system config"), (r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"), (r'\bkill\s+-9\s+-1\b', "kill all processes"), (r'\bpkill\s+-9\b', "force kill processes"), (r':()\s*{\s*:\s*\|\s*:&\s*}\s*;:', "fork bomb"), (r'\b(bash|sh|zsh)\s+-c\s+', "shell command via -c flag"), (r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"), (r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"), (r'\b(bash|sh|zsh|ksh)\s+<\s* tuple: """Check if a command matches any dangerous patterns. Returns: (is_dangerous, pattern_key, description) or (False, None, None) """ command_lower = command.lower() for pattern, description in DANGEROUS_PATTERNS: if re.search(pattern, command_lower, re.IGNORECASE | re.DOTALL): pattern_key = pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20] return (True, pattern_key, description) return (False, None, None) # ========================================================================= # Per-session approval state (thread-safe) # ========================================================================= _lock = threading.Lock() _pending: dict[str, dict] = {} _session_approved: dict[str, set] = {} _permanent_approved: set = set() def submit_pending(session_key: str, approval: dict): """Store a pending approval request for a session.""" with _lock: _pending[session_key] = approval def pop_pending(session_key: str) -> Optional[dict]: """Retrieve and remove a pending approval for a session.""" with _lock: return _pending.pop(session_key, None) def has_pending(session_key: str) -> bool: """Check if a session has a pending approval request.""" with _lock: return session_key in _pending def approve_session(session_key: str, pattern_key: str): """Approve a pattern for this session only.""" with _lock: _session_approved.setdefault(session_key, set()).add(pattern_key) def is_approved(session_key: str, pattern_key: str) -> bool: """Check if a pattern is approved (session-scoped or permanent).""" with _lock: if pattern_key in _permanent_approved: return True return pattern_key in _session_approved.get(session_key, set()) def approve_permanent(pattern_key: str): """Add a pattern to the permanent allowlist.""" with _lock: _permanent_approved.add(pattern_key) def load_permanent(patterns: set): """Bulk-load permanent allowlist entries from config.""" with _lock: _permanent_approved.update(patterns) def clear_session(session_key: str): """Clear all approvals and pending requests for a session.""" with _lock: _session_approved.pop(session_key, None) _pending.pop(session_key, None) # ========================================================================= # Config persistence for permanent allowlist # ========================================================================= def load_permanent_allowlist() -> set: """Load permanently allowed command patterns from config. Also syncs them into the approval module so is_approved() works for patterns added via 'always' in a previous session. """ try: from hermes_cli.config import load_config config = load_config() patterns = set(config.get("command_allowlist", []) or []) if patterns: load_permanent(patterns) return patterns except Exception: return set() def save_permanent_allowlist(patterns: set): """Save permanently allowed command patterns to config.""" try: from hermes_cli.config import load_config, save_config config = load_config() config["command_allowlist"] = list(patterns) save_config(config) except Exception as e: logger.warning("Could not save allowlist: %s", e) # ========================================================================= # Approval prompting + orchestration # ========================================================================= def prompt_dangerous_approval(command: str, description: str, timeout_seconds: int = 60, approval_callback=None) -> str: """Prompt the user to approve a dangerous command (CLI only). Args: approval_callback: Optional callback registered by the CLI for prompt_toolkit integration. Signature: (command, description) -> str. Returns: 'once', 'session', 'always', or 'deny' """ if approval_callback is not None: try: return approval_callback(command, description) except Exception: return "deny" os.environ["HERMES_SPINNER_PAUSE"] = "1" try: print() print(f" ⚠️ DANGEROUS COMMAND: {description}") print(f" {command[:80]}{'...' if len(command) > 80 else ''}") print() print(f" [o]nce | [s]ession | [a]lways | [d]eny") print() sys.stdout.flush() result = {"choice": ""} def get_input(): try: result["choice"] = input(" Choice [o/s/a/D]: ").strip().lower() except (EOFError, OSError): result["choice"] = "" thread = threading.Thread(target=get_input, daemon=True) thread.start() thread.join(timeout=timeout_seconds) if thread.is_alive(): print("\n ⏱ Timeout - denying command") return "deny" choice = result["choice"] if choice in ('o', 'once'): print(" ✓ Allowed once") return "once" elif choice in ('s', 'session'): print(" ✓ Allowed for this session") return "session" elif choice in ('a', 'always'): print(" ✓ Added to permanent allowlist") return "always" else: print(" ✗ Denied") return "deny" except (EOFError, KeyboardInterrupt): print("\n ✗ Cancelled") return "deny" finally: if "HERMES_SPINNER_PAUSE" in os.environ: del os.environ["HERMES_SPINNER_PAUSE"] print() sys.stdout.flush() def check_dangerous_command(command: str, env_type: str, approval_callback=None) -> dict: """Check if a command is dangerous and handle approval. This is the main entry point called by terminal_tool before executing any command. It orchestrates detection, session checks, and prompting. Args: command: The shell command to check. env_type: Terminal backend type ('local', 'ssh', 'docker', etc.). approval_callback: Optional CLI callback for interactive prompts. Returns: {"approved": True/False, "message": str or None, ...} """ if env_type in ("docker", "singularity", "modal", "daytona"): return {"approved": True, "message": None} is_dangerous, pattern_key, description = detect_dangerous_command(command) if not is_dangerous: return {"approved": True, "message": None} session_key = os.getenv("HERMES_SESSION_KEY", "default") if is_approved(session_key, pattern_key): return {"approved": True, "message": None} is_cli = os.getenv("HERMES_INTERACTIVE") is_gateway = os.getenv("HERMES_GATEWAY_SESSION") if not is_cli and not is_gateway: return {"approved": True, "message": None} if is_gateway or os.getenv("HERMES_EXEC_ASK"): submit_pending(session_key, { "command": command, "pattern_key": pattern_key, "description": description, }) return { "approved": False, "pattern_key": pattern_key, "status": "approval_required", "command": command, "description": description, "message": f"⚠️ This command is potentially dangerous ({description}). Asking the user for approval...", } choice = prompt_dangerous_approval(command, description, approval_callback=approval_callback) if choice == "deny": return { "approved": False, "message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.", "pattern_key": pattern_key, "description": description, } if choice == "session": approve_session(session_key, pattern_key) elif choice == "always": approve_session(session_key, pattern_key) approve_permanent(pattern_key) save_permanent_allowlist(_permanent_approved) return {"approved": True, "message": None}