The gateway's dangerous command approval system was fundamentally broken: the agent loop continued running after a command was flagged, and the approval request only reached the user after the agent finished its entire conversation loop. By then the context was lost. This change makes the gateway approval mirror the CLI's synchronous behavior. When a dangerous command is detected: 1. The agent thread blocks on a threading.Event 2. The approval request is sent to the user immediately 3. The user responds with /approve or /deny 4. The event is signaled and the agent resumes with the real result The agent never sees 'approval_required' as a tool result. It either gets the command output (approved) or a definitive BLOCKED message (denied/timed out) — same as CLI mode. Queue-based design supports multiple concurrent approvals (parallel subagents via delegate_task, execute_code RPC handlers). Each approval gets its own _ApprovalEntry with its own threading.Event. /approve resolves the oldest (FIFO); /approve all resolves all at once. Changes: - tools/approval.py: Queue-based per-session blocking gateway approval (register/unregister callbacks, resolve with FIFO or all-at-once) - gateway/run.py: Register approval callback in run_sync(), remove post-loop pop_pending hack, /approve and /deny support 'all' flag - tests: 21 tests including parallel subagent E2E scenarios
841 lines
33 KiB
Python
841 lines
33 KiB
Python
"""Dangerous command approval -- detection, prompting, and per-session state.
|
|
|
|
This module is the single source of truth for the dangerous command system:
|
|
- Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
|
|
- Per-session approval state (thread-safe, keyed by session_key)
|
|
- Approval prompting (CLI interactive + gateway async)
|
|
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
|
|
- Permanent allowlist persistence (config.yaml)
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import unicodedata
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Sensitive write targets that should trigger approval even when referenced
|
|
# via shell expansions like $HOME or $HERMES_HOME.
|
|
_SSH_SENSITIVE_PATH = r'(?:~|\$home|\$\{home\})/\.ssh(?:/|$)'
|
|
_HERMES_ENV_PATH = (
|
|
r'(?:~\/\.hermes/|'
|
|
r'(?:\$home|\$\{home\})/\.hermes/|'
|
|
r'(?:\$hermes_home|\$\{hermes_home\})/)'
|
|
r'\.env\b'
|
|
)
|
|
_SENSITIVE_WRITE_TARGET = (
|
|
r'(?:/etc/|/dev/sd|'
|
|
rf'{_SSH_SENSITIVE_PATH}|'
|
|
rf'{_HERMES_ENV_PATH})'
|
|
)
|
|
|
|
# =========================================================================
|
|
# Dangerous command patterns
|
|
# =========================================================================
|
|
|
|
DANGEROUS_PATTERNS = [
|
|
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
|
|
(r'\brm\s+-[^\s]*r', "recursive delete"),
|
|
(r'\brm\s+--recursive\b', "recursive delete (long flag)"),
|
|
(r'\bchmod\s+(-[^\s]*\s+)*(777|666|o\+[rwx]*w|a\+[rwx]*w)\b', "world/other-writable permissions"),
|
|
(r'\bchmod\s+--recursive\b.*(777|666|o\+[rwx]*w|a\+[rwx]*w)', "recursive world/other-writable (long flag)"),
|
|
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
|
|
(r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
|
|
(r'\bmkfs\b', "format filesystem"),
|
|
(r'\bdd\s+.*if=', "disk copy"),
|
|
(r'>\s*/dev/sd', "write to block device"),
|
|
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
|
|
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
|
|
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
|
|
(r'>\s*/etc/', "overwrite system config"),
|
|
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
|
|
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
|
|
(r'\bpkill\s+-9\b', "force kill processes"),
|
|
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
|
|
# Any shell invocation via -c or combined flags like -lc, -ic, etc.
|
|
(r'\b(bash|sh|zsh|ksh)\s+-[^\s]*c(\s+|$)', "shell command via -c/-lc flag"),
|
|
(r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
|
|
(r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
|
|
(r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"),
|
|
(rf'\btee\b.*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via tee"),
|
|
(rf'>>?\s*["\']?{_SENSITIVE_WRITE_TARGET}', "overwrite system file via redirection"),
|
|
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
|
|
(r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"),
|
|
(r'\bfind\b.*-delete\b', "find -delete"),
|
|
# Gateway protection: never start gateway outside systemd management
|
|
(r'gateway\s+run\b.*(&\s*$|&\s*;|\bdisown\b|\bsetsid\b)', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
|
|
(r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
|
|
# Self-termination protection: prevent agent from killing its own process
|
|
(r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
|
|
# File copy/move/edit into sensitive system paths
|
|
(r'\b(cp|mv|install)\b.*\s/etc/', "copy/move file into /etc/"),
|
|
(r'\bsed\s+-[^\s]*i.*\s/etc/', "in-place edit of system config"),
|
|
(r'\bsed\s+--in-place\b.*\s/etc/', "in-place edit of system config (long flag)"),
|
|
]
|
|
|
|
|
|
def _legacy_pattern_key(pattern: str) -> str:
|
|
"""Reproduce the old regex-derived approval key for backwards compatibility."""
|
|
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
|
|
|
|
|
_PATTERN_KEY_ALIASES: dict[str, set[str]] = {}
|
|
for _pattern, _description in DANGEROUS_PATTERNS:
|
|
_legacy_key = _legacy_pattern_key(_pattern)
|
|
_canonical_key = _description
|
|
_PATTERN_KEY_ALIASES.setdefault(_canonical_key, set()).update({_canonical_key, _legacy_key})
|
|
_PATTERN_KEY_ALIASES.setdefault(_legacy_key, set()).update({_legacy_key, _canonical_key})
|
|
|
|
|
|
def _approval_key_aliases(pattern_key: str) -> set[str]:
|
|
"""Return all approval keys that should match this pattern.
|
|
|
|
New approvals use the human-readable description string, but older
|
|
command_allowlist entries and session approvals may still contain the
|
|
historical regex-derived key.
|
|
"""
|
|
return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
|
|
|
|
|
|
# =========================================================================
|
|
# Detection
|
|
# =========================================================================
|
|
|
|
def _normalize_command_for_detection(command: str) -> str:
|
|
"""Normalize a command string before dangerous-pattern matching.
|
|
|
|
Strips ANSI escape sequences (full ECMA-48 via tools.ansi_strip),
|
|
null bytes, and normalizes Unicode fullwidth characters so that
|
|
obfuscation techniques cannot bypass the pattern-based detection.
|
|
"""
|
|
from tools.ansi_strip import strip_ansi
|
|
|
|
# Strip all ANSI escape sequences (CSI, OSC, DCS, 8-bit C1, etc.)
|
|
command = strip_ansi(command)
|
|
# Strip null bytes
|
|
command = command.replace('\x00', '')
|
|
# Normalize Unicode (fullwidth Latin, halfwidth Katakana, etc.)
|
|
command = unicodedata.normalize('NFKC', command)
|
|
return command
|
|
|
|
|
|
def detect_dangerous_command(command: str) -> tuple:
|
|
"""Check if a command matches any dangerous patterns.
|
|
|
|
Returns:
|
|
(is_dangerous, pattern_key, description) or (False, None, None)
|
|
"""
|
|
command_lower = _normalize_command_for_detection(command).lower()
|
|
for pattern, description in DANGEROUS_PATTERNS:
|
|
if re.search(pattern, command_lower, re.IGNORECASE | re.DOTALL):
|
|
pattern_key = description
|
|
return (True, pattern_key, description)
|
|
return (False, None, None)
|
|
|
|
|
|
# =========================================================================
|
|
# Per-session approval state (thread-safe)
|
|
# =========================================================================
|
|
|
|
_lock = threading.Lock()
|
|
_pending: dict[str, dict] = {}
|
|
_session_approved: dict[str, set] = {}
|
|
_permanent_approved: set = set()
|
|
|
|
# =========================================================================
|
|
# Blocking gateway approval (mirrors CLI's synchronous input() flow)
|
|
# =========================================================================
|
|
# Per-session QUEUE of pending approvals. Multiple threads (parallel
|
|
# subagents, execute_code RPC handlers) can block concurrently — each gets
|
|
# its own threading.Event. /approve resolves the oldest, /approve all
|
|
# resolves every pending approval in the session.
|
|
|
|
|
|
class _ApprovalEntry:
|
|
"""One pending dangerous-command approval inside a gateway session."""
|
|
__slots__ = ("event", "data", "result")
|
|
|
|
def __init__(self, data: dict):
|
|
self.event = threading.Event()
|
|
self.data = data # command, description, pattern_keys, …
|
|
self.result: Optional[str] = None # "once"|"session"|"always"|"deny"
|
|
|
|
|
|
_gateway_queues: dict[str, list] = {} # session_key → [_ApprovalEntry, …]
|
|
_gateway_notify_cbs: dict[str, object] = {} # session_key → callable(approval_data)
|
|
|
|
|
|
def register_gateway_notify(session_key: str, cb) -> None:
|
|
"""Register a per-session callback for sending approval requests to the user.
|
|
|
|
The callback signature is ``cb(approval_data: dict) -> None`` where
|
|
*approval_data* contains ``command``, ``description``, and
|
|
``pattern_keys``. The callback bridges sync→async (runs in the agent
|
|
thread, must schedule the actual send on the event loop).
|
|
"""
|
|
with _lock:
|
|
_gateway_notify_cbs[session_key] = cb
|
|
|
|
|
|
def unregister_gateway_notify(session_key: str) -> None:
|
|
"""Unregister the per-session gateway approval callback.
|
|
|
|
Signals ALL blocked threads for this session so they don't hang forever
|
|
(e.g. when the agent run finishes or is interrupted).
|
|
"""
|
|
with _lock:
|
|
_gateway_notify_cbs.pop(session_key, None)
|
|
entries = _gateway_queues.pop(session_key, [])
|
|
for entry in entries:
|
|
entry.event.set()
|
|
|
|
|
|
def resolve_gateway_approval(session_key: str, choice: str,
|
|
resolve_all: bool = False) -> int:
|
|
"""Called by the gateway's /approve or /deny handler to unblock
|
|
waiting agent thread(s).
|
|
|
|
When *resolve_all* is True every pending approval in the session is
|
|
resolved at once (``/approve all``). Otherwise only the oldest one
|
|
is resolved (FIFO).
|
|
|
|
Returns the number of approvals resolved (0 means nothing was pending).
|
|
"""
|
|
with _lock:
|
|
queue = _gateway_queues.get(session_key)
|
|
if not queue:
|
|
return 0
|
|
if resolve_all:
|
|
targets = list(queue)
|
|
queue.clear()
|
|
else:
|
|
targets = [queue.pop(0)]
|
|
if not queue:
|
|
_gateway_queues.pop(session_key, None)
|
|
|
|
for entry in targets:
|
|
entry.result = choice
|
|
entry.event.set()
|
|
return len(targets)
|
|
|
|
|
|
def has_blocking_approval(session_key: str) -> bool:
|
|
"""Check if a session has one or more blocking gateway approvals waiting."""
|
|
with _lock:
|
|
return bool(_gateway_queues.get(session_key))
|
|
|
|
|
|
def pending_approval_count(session_key: str) -> int:
|
|
"""Return the number of pending blocking approvals for a session."""
|
|
with _lock:
|
|
return len(_gateway_queues.get(session_key, []))
|
|
|
|
|
|
def submit_pending(session_key: str, approval: dict):
|
|
"""Store a pending approval request for a session."""
|
|
with _lock:
|
|
_pending[session_key] = approval
|
|
|
|
|
|
def pop_pending(session_key: str) -> Optional[dict]:
|
|
"""Retrieve and remove a pending approval for a session."""
|
|
with _lock:
|
|
return _pending.pop(session_key, None)
|
|
|
|
|
|
def has_pending(session_key: str) -> bool:
|
|
"""Check if a session has a pending approval request."""
|
|
with _lock:
|
|
return session_key in _pending
|
|
|
|
|
|
def approve_session(session_key: str, pattern_key: str):
|
|
"""Approve a pattern for this session only."""
|
|
with _lock:
|
|
_session_approved.setdefault(session_key, set()).add(pattern_key)
|
|
|
|
|
|
def is_approved(session_key: str, pattern_key: str) -> bool:
|
|
"""Check if a pattern is approved (session-scoped or permanent).
|
|
|
|
Accept both the current canonical key and the legacy regex-derived key so
|
|
existing command_allowlist entries continue to work after key migrations.
|
|
"""
|
|
aliases = _approval_key_aliases(pattern_key)
|
|
with _lock:
|
|
if any(alias in _permanent_approved for alias in aliases):
|
|
return True
|
|
session_approvals = _session_approved.get(session_key, set())
|
|
return any(alias in session_approvals for alias in aliases)
|
|
|
|
|
|
def approve_permanent(pattern_key: str):
|
|
"""Add a pattern to the permanent allowlist."""
|
|
with _lock:
|
|
_permanent_approved.add(pattern_key)
|
|
|
|
|
|
def load_permanent(patterns: set):
|
|
"""Bulk-load permanent allowlist entries from config."""
|
|
with _lock:
|
|
_permanent_approved.update(patterns)
|
|
|
|
|
|
def clear_session(session_key: str):
|
|
"""Clear all approvals and pending requests for a session."""
|
|
with _lock:
|
|
_session_approved.pop(session_key, None)
|
|
_pending.pop(session_key, None)
|
|
_gateway_notify_cbs.pop(session_key, None)
|
|
# Signal ALL blocked threads so they don't hang forever
|
|
entries = _gateway_queues.pop(session_key, [])
|
|
for entry in entries:
|
|
entry.event.set()
|
|
|
|
|
|
# =========================================================================
|
|
# Config persistence for permanent allowlist
|
|
# =========================================================================
|
|
|
|
def load_permanent_allowlist() -> set:
|
|
"""Load permanently allowed command patterns from config.
|
|
|
|
Also syncs them into the approval module so is_approved() works for
|
|
patterns added via 'always' in a previous session.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
config = load_config()
|
|
patterns = set(config.get("command_allowlist", []) or [])
|
|
if patterns:
|
|
load_permanent(patterns)
|
|
return patterns
|
|
except Exception:
|
|
return set()
|
|
|
|
|
|
def save_permanent_allowlist(patterns: set):
|
|
"""Save permanently allowed command patterns to config."""
|
|
try:
|
|
from hermes_cli.config import load_config, save_config
|
|
config = load_config()
|
|
config["command_allowlist"] = list(patterns)
|
|
save_config(config)
|
|
except Exception as e:
|
|
logger.warning("Could not save allowlist: %s", e)
|
|
|
|
|
|
# =========================================================================
|
|
# Approval prompting + orchestration
|
|
# =========================================================================
|
|
|
|
def prompt_dangerous_approval(command: str, description: str,
|
|
timeout_seconds: int | None = None,
|
|
allow_permanent: bool = True,
|
|
approval_callback=None) -> str:
|
|
"""Prompt the user to approve a dangerous command (CLI only).
|
|
|
|
Args:
|
|
allow_permanent: When False, hide the [a]lways option (used when
|
|
tirith warnings are present, since broad permanent allowlisting
|
|
is inappropriate for content-level security findings).
|
|
approval_callback: Optional callback registered by the CLI for
|
|
prompt_toolkit integration. Signature:
|
|
(command, description, *, allow_permanent=True) -> str.
|
|
|
|
Returns: 'once', 'session', 'always', or 'deny'
|
|
"""
|
|
if timeout_seconds is None:
|
|
timeout_seconds = _get_approval_timeout()
|
|
|
|
if approval_callback is not None:
|
|
try:
|
|
return approval_callback(command, description,
|
|
allow_permanent=allow_permanent)
|
|
except Exception:
|
|
return "deny"
|
|
|
|
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
|
try:
|
|
while True:
|
|
print()
|
|
print(f" ⚠️ DANGEROUS COMMAND: {description}")
|
|
print(f" {command}")
|
|
print()
|
|
if allow_permanent:
|
|
print(" [o]nce | [s]ession | [a]lways | [d]eny")
|
|
else:
|
|
print(" [o]nce | [s]ession | [d]eny")
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
result = {"choice": ""}
|
|
|
|
def get_input():
|
|
try:
|
|
prompt = " Choice [o/s/a/D]: " if allow_permanent else " Choice [o/s/D]: "
|
|
result["choice"] = input(prompt).strip().lower()
|
|
except (EOFError, OSError):
|
|
result["choice"] = ""
|
|
|
|
thread = threading.Thread(target=get_input, daemon=True)
|
|
thread.start()
|
|
thread.join(timeout=timeout_seconds)
|
|
|
|
if thread.is_alive():
|
|
print("\n ⏱ Timeout - denying command")
|
|
return "deny"
|
|
|
|
choice = result["choice"]
|
|
if choice in ('o', 'once'):
|
|
print(" ✓ Allowed once")
|
|
return "once"
|
|
elif choice in ('s', 'session'):
|
|
print(" ✓ Allowed for this session")
|
|
return "session"
|
|
elif choice in ('a', 'always'):
|
|
if not allow_permanent:
|
|
print(" ✓ Allowed for this session")
|
|
return "session"
|
|
print(" ✓ Added to permanent allowlist")
|
|
return "always"
|
|
else:
|
|
print(" ✗ Denied")
|
|
return "deny"
|
|
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\n ✗ Cancelled")
|
|
return "deny"
|
|
finally:
|
|
if "HERMES_SPINNER_PAUSE" in os.environ:
|
|
del os.environ["HERMES_SPINNER_PAUSE"]
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
|
|
def _normalize_approval_mode(mode) -> str:
|
|
"""Normalize approval mode values loaded from YAML/config.
|
|
|
|
YAML 1.1 treats bare words like `off` as booleans, so a config entry like
|
|
`approvals:\n mode: off` is parsed as False unless quoted. Treat that as the
|
|
intended string mode instead of falling back to manual approvals.
|
|
"""
|
|
if isinstance(mode, bool):
|
|
return "off" if mode is False else "manual"
|
|
if isinstance(mode, str):
|
|
normalized = mode.strip().lower()
|
|
return normalized or "manual"
|
|
return "manual"
|
|
|
|
|
|
def _get_approval_config() -> dict:
|
|
"""Read the approvals config block. Returns a dict with 'mode', 'timeout', etc."""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
config = load_config()
|
|
return config.get("approvals", {}) or {}
|
|
except Exception:
|
|
return {}
|
|
|
|
|
|
def _get_approval_mode() -> str:
|
|
"""Read the approval mode from config. Returns 'manual', 'smart', or 'off'."""
|
|
mode = _get_approval_config().get("mode", "manual")
|
|
return _normalize_approval_mode(mode)
|
|
|
|
|
|
def _get_approval_timeout() -> int:
|
|
"""Read the approval timeout from config. Defaults to 60 seconds."""
|
|
try:
|
|
return int(_get_approval_config().get("timeout", 60))
|
|
except (ValueError, TypeError):
|
|
return 60
|
|
|
|
|
|
def _smart_approve(command: str, description: str) -> str:
|
|
"""Use the auxiliary LLM to assess risk and decide approval.
|
|
|
|
Returns 'approve' if the LLM determines the command is safe,
|
|
'deny' if genuinely dangerous, or 'escalate' if uncertain.
|
|
|
|
Inspired by OpenAI Codex's Smart Approvals guardian subagent
|
|
(openai/codex#13860).
|
|
"""
|
|
try:
|
|
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
|
|
|
|
client, model = get_text_auxiliary_client(task="approval")
|
|
if not client or not model:
|
|
logger.debug("Smart approvals: no aux client available, escalating")
|
|
return "escalate"
|
|
|
|
prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous.
|
|
|
|
Command: {command}
|
|
Flagged reason: {description}
|
|
|
|
Assess the ACTUAL risk of this command. Many flagged commands are false positives — for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless.
|
|
|
|
Rules:
|
|
- APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.)
|
|
- DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.)
|
|
- ESCALATE if you're uncertain
|
|
|
|
Respond with exactly one word: APPROVE, DENY, or ESCALATE"""
|
|
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
**auxiliary_max_tokens_param(16),
|
|
temperature=0,
|
|
)
|
|
|
|
answer = (response.choices[0].message.content or "").strip().upper()
|
|
|
|
if "APPROVE" in answer:
|
|
return "approve"
|
|
elif "DENY" in answer:
|
|
return "deny"
|
|
else:
|
|
return "escalate"
|
|
|
|
except Exception as e:
|
|
logger.debug("Smart approvals: LLM call failed (%s), escalating", e)
|
|
return "escalate"
|
|
|
|
|
|
def check_dangerous_command(command: str, env_type: str,
|
|
approval_callback=None) -> dict:
|
|
"""Check if a command is dangerous and handle approval.
|
|
|
|
This is the main entry point called by terminal_tool before executing
|
|
any command. It orchestrates detection, session checks, and prompting.
|
|
|
|
Args:
|
|
command: The shell command to check.
|
|
env_type: Terminal backend type ('local', 'ssh', 'docker', etc.).
|
|
approval_callback: Optional CLI callback for interactive prompts.
|
|
|
|
Returns:
|
|
{"approved": True/False, "message": str or None, ...}
|
|
"""
|
|
if env_type in ("docker", "singularity", "modal", "daytona"):
|
|
return {"approved": True, "message": None}
|
|
|
|
# --yolo: bypass all approval prompts
|
|
if os.getenv("HERMES_YOLO_MODE"):
|
|
return {"approved": True, "message": None}
|
|
|
|
is_dangerous, pattern_key, description = detect_dangerous_command(command)
|
|
if not is_dangerous:
|
|
return {"approved": True, "message": None}
|
|
|
|
session_key = os.getenv("HERMES_SESSION_KEY", "default")
|
|
if is_approved(session_key, pattern_key):
|
|
return {"approved": True, "message": None}
|
|
|
|
is_cli = os.getenv("HERMES_INTERACTIVE")
|
|
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
|
|
|
if not is_cli and not is_gateway:
|
|
return {"approved": True, "message": None}
|
|
|
|
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
|
|
submit_pending(session_key, {
|
|
"command": command,
|
|
"pattern_key": pattern_key,
|
|
"description": description,
|
|
})
|
|
return {
|
|
"approved": False,
|
|
"pattern_key": pattern_key,
|
|
"status": "approval_required",
|
|
"command": command,
|
|
"description": description,
|
|
"message": (
|
|
f"⚠️ This command is potentially dangerous ({description}). "
|
|
f"Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
|
|
),
|
|
}
|
|
|
|
choice = prompt_dangerous_approval(command, description,
|
|
approval_callback=approval_callback)
|
|
|
|
if choice == "deny":
|
|
return {
|
|
"approved": False,
|
|
"message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.",
|
|
"pattern_key": pattern_key,
|
|
"description": description,
|
|
}
|
|
|
|
if choice == "session":
|
|
approve_session(session_key, pattern_key)
|
|
elif choice == "always":
|
|
approve_session(session_key, pattern_key)
|
|
approve_permanent(pattern_key)
|
|
save_permanent_allowlist(_permanent_approved)
|
|
|
|
return {"approved": True, "message": None}
|
|
|
|
|
|
# =========================================================================
|
|
# Combined pre-exec guard (tirith + dangerous command detection)
|
|
# =========================================================================
|
|
|
|
def _format_tirith_description(tirith_result: dict) -> str:
|
|
"""Build a human-readable description from tirith findings.
|
|
|
|
Includes severity, title, and description for each finding so users
|
|
can make an informed approval decision.
|
|
"""
|
|
findings = tirith_result.get("findings") or []
|
|
if not findings:
|
|
summary = tirith_result.get("summary") or "security issue detected"
|
|
return f"Security scan: {summary}"
|
|
|
|
parts = []
|
|
for f in findings:
|
|
severity = f.get("severity", "")
|
|
title = f.get("title", "")
|
|
desc = f.get("description", "")
|
|
if title and desc:
|
|
parts.append(f"[{severity}] {title}: {desc}" if severity else f"{title}: {desc}")
|
|
elif title:
|
|
parts.append(f"[{severity}] {title}" if severity else title)
|
|
if not parts:
|
|
summary = tirith_result.get("summary") or "security issue detected"
|
|
return f"Security scan: {summary}"
|
|
|
|
return "Security scan — " + "; ".join(parts)
|
|
|
|
|
|
def check_all_command_guards(command: str, env_type: str,
|
|
approval_callback=None) -> dict:
|
|
"""Run all pre-exec security checks and return a single approval decision.
|
|
|
|
Gathers findings from tirith and dangerous-command detection, then
|
|
presents them as a single combined approval request. This prevents
|
|
a gateway force=True replay from bypassing one check when only the
|
|
other was shown to the user.
|
|
"""
|
|
# Skip containers for both checks
|
|
if env_type in ("docker", "singularity", "modal", "daytona"):
|
|
return {"approved": True, "message": None}
|
|
|
|
# --yolo or approvals.mode=off: bypass all approval prompts
|
|
approval_mode = _get_approval_mode()
|
|
if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off":
|
|
return {"approved": True, "message": None}
|
|
|
|
is_cli = os.getenv("HERMES_INTERACTIVE")
|
|
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
|
is_ask = os.getenv("HERMES_EXEC_ASK")
|
|
|
|
# Preserve the existing non-interactive behavior: outside CLI/gateway/ask
|
|
# flows, we do not block on approvals and we skip external guard work.
|
|
if not is_cli and not is_gateway and not is_ask:
|
|
return {"approved": True, "message": None}
|
|
|
|
# --- Phase 1: Gather findings from both checks ---
|
|
|
|
# Tirith check — wrapper guarantees no raise for expected failures.
|
|
# Only catch ImportError (module not installed).
|
|
tirith_result = {"action": "allow", "findings": [], "summary": ""}
|
|
try:
|
|
from tools.tirith_security import check_command_security
|
|
tirith_result = check_command_security(command)
|
|
except ImportError:
|
|
pass # tirith module not installed — allow
|
|
|
|
# Dangerous command check (detection only, no approval)
|
|
is_dangerous, pattern_key, description = detect_dangerous_command(command)
|
|
|
|
# --- Phase 2: Decide ---
|
|
|
|
# Collect warnings that need approval
|
|
warnings = [] # list of (pattern_key, description, is_tirith)
|
|
|
|
session_key = os.getenv("HERMES_SESSION_KEY", "default")
|
|
|
|
# Tirith block/warn → approvable warning with rich findings.
|
|
# Previously, tirith "block" was a hard block with no approval prompt.
|
|
# Now both block and warn go through the approval flow so users can
|
|
# inspect the explanation and approve if they understand the risk.
|
|
if tirith_result["action"] in ("block", "warn"):
|
|
findings = tirith_result.get("findings") or []
|
|
rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown"
|
|
tirith_key = f"tirith:{rule_id}"
|
|
tirith_desc = _format_tirith_description(tirith_result)
|
|
if not is_approved(session_key, tirith_key):
|
|
warnings.append((tirith_key, tirith_desc, True))
|
|
|
|
if is_dangerous:
|
|
if not is_approved(session_key, pattern_key):
|
|
warnings.append((pattern_key, description, False))
|
|
|
|
# Nothing to warn about
|
|
if not warnings:
|
|
return {"approved": True, "message": None}
|
|
|
|
# --- Phase 2.5: Smart approval (auxiliary LLM risk assessment) ---
|
|
# When approvals.mode=smart, ask the aux LLM before prompting the user.
|
|
# Inspired by OpenAI Codex's Smart Approvals guardian subagent
|
|
# (openai/codex#13860).
|
|
if approval_mode == "smart":
|
|
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
|
|
verdict = _smart_approve(command, combined_desc_for_llm)
|
|
if verdict == "approve":
|
|
# Auto-approve and grant session-level approval for these patterns
|
|
for key, _, _ in warnings:
|
|
approve_session(session_key, key)
|
|
logger.debug("Smart approval: auto-approved '%s' (%s)",
|
|
command[:60], combined_desc_for_llm)
|
|
return {"approved": True, "message": None,
|
|
"smart_approved": True}
|
|
elif verdict == "deny":
|
|
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
|
|
return {
|
|
"approved": False,
|
|
"message": f"BLOCKED by smart approval: {combined_desc_for_llm}. "
|
|
"The command was assessed as genuinely dangerous. Do NOT retry.",
|
|
"smart_denied": True,
|
|
}
|
|
# verdict == "escalate" → fall through to manual prompt
|
|
|
|
# --- Phase 3: Approval ---
|
|
|
|
# Combine descriptions for a single approval prompt
|
|
combined_desc = "; ".join(desc for _, desc, _ in warnings)
|
|
primary_key = warnings[0][0]
|
|
all_keys = [key for key, _, _ in warnings]
|
|
has_tirith = any(is_t for _, _, is_t in warnings)
|
|
|
|
# Gateway/async approval — block the agent thread until the user
|
|
# responds with /approve or /deny, mirroring the CLI's synchronous
|
|
# input() flow. The agent never sees "approval_required"; it either
|
|
# gets the command output (approved) or a definitive "BLOCKED" message.
|
|
if is_gateway or is_ask:
|
|
notify_cb = None
|
|
with _lock:
|
|
notify_cb = _gateway_notify_cbs.get(session_key)
|
|
|
|
if notify_cb is not None:
|
|
# --- Blocking gateway approval (queue-based) ---
|
|
# Each call gets its own _ApprovalEntry so parallel subagents
|
|
# and execute_code threads can block concurrently.
|
|
approval_data = {
|
|
"command": command,
|
|
"pattern_key": primary_key,
|
|
"pattern_keys": all_keys,
|
|
"description": combined_desc,
|
|
}
|
|
entry = _ApprovalEntry(approval_data)
|
|
with _lock:
|
|
_gateway_queues.setdefault(session_key, []).append(entry)
|
|
|
|
# Notify the user (bridges sync agent thread → async gateway)
|
|
try:
|
|
notify_cb(approval_data)
|
|
except Exception as exc:
|
|
logger.warning("Gateway approval notify failed: %s", exc)
|
|
with _lock:
|
|
queue = _gateway_queues.get(session_key, [])
|
|
if entry in queue:
|
|
queue.remove(entry)
|
|
if not queue:
|
|
_gateway_queues.pop(session_key, None)
|
|
return {
|
|
"approved": False,
|
|
"message": "BLOCKED: Failed to send approval request to user. Do NOT retry.",
|
|
"pattern_key": primary_key,
|
|
"description": combined_desc,
|
|
}
|
|
|
|
# Block until the user responds or timeout (default 5 min)
|
|
timeout = _get_approval_config().get("gateway_timeout", 300)
|
|
try:
|
|
timeout = int(timeout)
|
|
except (ValueError, TypeError):
|
|
timeout = 300
|
|
resolved = entry.event.wait(timeout=timeout)
|
|
|
|
# Clean up this entry from the queue
|
|
with _lock:
|
|
queue = _gateway_queues.get(session_key, [])
|
|
if entry in queue:
|
|
queue.remove(entry)
|
|
if not queue:
|
|
_gateway_queues.pop(session_key, None)
|
|
|
|
choice = entry.result
|
|
if not resolved or choice is None or choice == "deny":
|
|
reason = "timed out" if not resolved else "denied by user"
|
|
return {
|
|
"approved": False,
|
|
"message": f"BLOCKED: Command {reason}. Do NOT retry this command.",
|
|
"pattern_key": primary_key,
|
|
"description": combined_desc,
|
|
}
|
|
|
|
# User approved — persist based on scope (same logic as CLI)
|
|
for key, _, is_tirith in warnings:
|
|
if choice in ("once", "session") or (choice == "always" and is_tirith):
|
|
approve_session(session_key, key)
|
|
elif choice == "always":
|
|
approve_session(session_key, key)
|
|
approve_permanent(key)
|
|
save_permanent_allowlist(_permanent_approved)
|
|
|
|
return {"approved": True, "message": None}
|
|
|
|
# Fallback: no gateway callback registered (e.g. cron, batch).
|
|
# Return approval_required for backward compat.
|
|
submit_pending(session_key, {
|
|
"command": command,
|
|
"pattern_key": primary_key,
|
|
"pattern_keys": all_keys,
|
|
"description": combined_desc,
|
|
})
|
|
return {
|
|
"approved": False,
|
|
"pattern_key": primary_key,
|
|
"status": "approval_required",
|
|
"command": command,
|
|
"description": combined_desc,
|
|
"message": (
|
|
f"⚠️ {combined_desc}. Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
|
|
),
|
|
}
|
|
|
|
# CLI interactive: single combined prompt
|
|
# Hide [a]lways when any tirith warning is present
|
|
choice = prompt_dangerous_approval(command, combined_desc,
|
|
allow_permanent=not has_tirith,
|
|
approval_callback=approval_callback)
|
|
|
|
if choice == "deny":
|
|
return {
|
|
"approved": False,
|
|
"message": "BLOCKED: User denied. Do NOT retry.",
|
|
"pattern_key": primary_key,
|
|
"description": combined_desc,
|
|
}
|
|
|
|
# Persist approval for each warning individually
|
|
for key, _, is_tirith in warnings:
|
|
if choice == "session" or (choice == "always" and is_tirith):
|
|
# tirith: session only (no permanent broad allowlisting)
|
|
approve_session(session_key, key)
|
|
elif choice == "always":
|
|
# dangerous patterns: permanent allowed
|
|
approve_session(session_key, key)
|
|
approve_permanent(key)
|
|
save_permanent_allowlist(_permanent_approved)
|
|
|
|
return {"approved": True, "message": None}
|