Prevent the agent from accidentally killing its own process with pkill -f gateway, killall hermes, etc. Adds a dangerous command pattern that triggers the approval flow. Co-authored-by: arasovic <arasovic@users.noreply.github.com>
635 lines
25 KiB
Python
635 lines
25 KiB
Python
"""Dangerous command approval -- detection, prompting, and per-session state.
|
|
|
|
This module is the single source of truth for the dangerous command system:
|
|
- Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
|
|
- Per-session approval state (thread-safe, keyed by session_key)
|
|
- Approval prompting (CLI interactive + gateway async)
|
|
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
|
|
- Permanent allowlist persistence (config.yaml)
|
|
"""
|
|
|
|
import logging
|
|
import os
|
|
import re
|
|
import sys
|
|
import threading
|
|
import unicodedata
|
|
from typing import Optional
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# =========================================================================
|
|
# Dangerous command patterns
|
|
# =========================================================================
|
|
|
|
DANGEROUS_PATTERNS = [
|
|
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
|
|
(r'\brm\s+-[^\s]*r', "recursive delete"),
|
|
(r'\brm\s+--recursive\b', "recursive delete (long flag)"),
|
|
(r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"),
|
|
(r'\bchmod\s+--recursive\b.*777', "recursive world-writable (long flag)"),
|
|
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
|
|
(r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
|
|
(r'\bmkfs\b', "format filesystem"),
|
|
(r'\bdd\s+.*if=', "disk copy"),
|
|
(r'>\s*/dev/sd', "write to block device"),
|
|
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
|
|
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
|
|
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
|
|
(r'>\s*/etc/', "overwrite system config"),
|
|
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
|
|
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
|
|
(r'\bpkill\s+-9\b', "force kill processes"),
|
|
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
|
|
# Any shell invocation via -c or combined flags like -lc, -ic, etc.
|
|
(r'\b(bash|sh|zsh|ksh)\s+-[^\s]*c(\s+|$)', "shell command via -c/-lc flag"),
|
|
(r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
|
|
(r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
|
|
(r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"),
|
|
(r'\btee\b.*(/etc/|/dev/sd|\.ssh/|\.hermes/\.env)', "overwrite system file via tee"),
|
|
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
|
|
(r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"),
|
|
(r'\bfind\b.*-delete\b', "find -delete"),
|
|
# Gateway protection: never start gateway outside systemd management
|
|
(r'gateway\s+run\b.*(&\s*$|&\s*;|\bdisown\b|\bsetsid\b)', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
|
|
(r'\bnohup\b.*gateway\s+run\b', "start gateway outside systemd (use 'systemctl --user restart hermes-gateway')"),
|
|
# Self-termination protection: prevent agent from killing its own process
|
|
(r'\b(pkill|killall)\b.*\b(hermes|gateway|cli\.py)\b', "kill hermes/gateway process (self-termination)"),
|
|
]
|
|
|
|
|
|
def _legacy_pattern_key(pattern: str) -> str:
|
|
"""Reproduce the old regex-derived approval key for backwards compatibility."""
|
|
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
|
|
|
|
|
|
_PATTERN_KEY_ALIASES: dict[str, set[str]] = {}
|
|
for _pattern, _description in DANGEROUS_PATTERNS:
|
|
_legacy_key = _legacy_pattern_key(_pattern)
|
|
_canonical_key = _description
|
|
_PATTERN_KEY_ALIASES.setdefault(_canonical_key, set()).update({_canonical_key, _legacy_key})
|
|
_PATTERN_KEY_ALIASES.setdefault(_legacy_key, set()).update({_legacy_key, _canonical_key})
|
|
|
|
|
|
def _approval_key_aliases(pattern_key: str) -> set[str]:
|
|
"""Return all approval keys that should match this pattern.
|
|
|
|
New approvals use the human-readable description string, but older
|
|
command_allowlist entries and session approvals may still contain the
|
|
historical regex-derived key.
|
|
"""
|
|
return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
|
|
|
|
|
|
# =========================================================================
|
|
# Detection
|
|
# =========================================================================
|
|
|
|
def _normalize_command_for_detection(command: str) -> str:
|
|
"""Normalize a command string before dangerous-pattern matching.
|
|
|
|
Strips ANSI escape sequences (full ECMA-48 via tools.ansi_strip),
|
|
null bytes, and normalizes Unicode fullwidth characters so that
|
|
obfuscation techniques cannot bypass the pattern-based detection.
|
|
"""
|
|
from tools.ansi_strip import strip_ansi
|
|
|
|
# Strip all ANSI escape sequences (CSI, OSC, DCS, 8-bit C1, etc.)
|
|
command = strip_ansi(command)
|
|
# Strip null bytes
|
|
command = command.replace('\x00', '')
|
|
# Normalize Unicode (fullwidth Latin, halfwidth Katakana, etc.)
|
|
command = unicodedata.normalize('NFKC', command)
|
|
return command
|
|
|
|
|
|
def detect_dangerous_command(command: str) -> tuple:
|
|
"""Check if a command matches any dangerous patterns.
|
|
|
|
Returns:
|
|
(is_dangerous, pattern_key, description) or (False, None, None)
|
|
"""
|
|
command_lower = _normalize_command_for_detection(command).lower()
|
|
for pattern, description in DANGEROUS_PATTERNS:
|
|
if re.search(pattern, command_lower, re.IGNORECASE | re.DOTALL):
|
|
pattern_key = description
|
|
return (True, pattern_key, description)
|
|
return (False, None, None)
|
|
|
|
|
|
# =========================================================================
|
|
# Per-session approval state (thread-safe)
|
|
# =========================================================================
|
|
|
|
_lock = threading.Lock()
|
|
_pending: dict[str, dict] = {}
|
|
_session_approved: dict[str, set] = {}
|
|
_permanent_approved: set = set()
|
|
|
|
|
|
def submit_pending(session_key: str, approval: dict):
|
|
"""Store a pending approval request for a session."""
|
|
with _lock:
|
|
_pending[session_key] = approval
|
|
|
|
|
|
def pop_pending(session_key: str) -> Optional[dict]:
|
|
"""Retrieve and remove a pending approval for a session."""
|
|
with _lock:
|
|
return _pending.pop(session_key, None)
|
|
|
|
|
|
def has_pending(session_key: str) -> bool:
|
|
"""Check if a session has a pending approval request."""
|
|
with _lock:
|
|
return session_key in _pending
|
|
|
|
|
|
def approve_session(session_key: str, pattern_key: str):
|
|
"""Approve a pattern for this session only."""
|
|
with _lock:
|
|
_session_approved.setdefault(session_key, set()).add(pattern_key)
|
|
|
|
|
|
def is_approved(session_key: str, pattern_key: str) -> bool:
|
|
"""Check if a pattern is approved (session-scoped or permanent).
|
|
|
|
Accept both the current canonical key and the legacy regex-derived key so
|
|
existing command_allowlist entries continue to work after key migrations.
|
|
"""
|
|
aliases = _approval_key_aliases(pattern_key)
|
|
with _lock:
|
|
if any(alias in _permanent_approved for alias in aliases):
|
|
return True
|
|
session_approvals = _session_approved.get(session_key, set())
|
|
return any(alias in session_approvals for alias in aliases)
|
|
|
|
|
|
def approve_permanent(pattern_key: str):
|
|
"""Add a pattern to the permanent allowlist."""
|
|
with _lock:
|
|
_permanent_approved.add(pattern_key)
|
|
|
|
|
|
def load_permanent(patterns: set):
|
|
"""Bulk-load permanent allowlist entries from config."""
|
|
with _lock:
|
|
_permanent_approved.update(patterns)
|
|
|
|
|
|
def clear_session(session_key: str):
|
|
"""Clear all approvals and pending requests for a session."""
|
|
with _lock:
|
|
_session_approved.pop(session_key, None)
|
|
_pending.pop(session_key, None)
|
|
|
|
|
|
# =========================================================================
|
|
# Config persistence for permanent allowlist
|
|
# =========================================================================
|
|
|
|
def load_permanent_allowlist() -> set:
|
|
"""Load permanently allowed command patterns from config.
|
|
|
|
Also syncs them into the approval module so is_approved() works for
|
|
patterns added via 'always' in a previous session.
|
|
"""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
config = load_config()
|
|
patterns = set(config.get("command_allowlist", []) or [])
|
|
if patterns:
|
|
load_permanent(patterns)
|
|
return patterns
|
|
except Exception:
|
|
return set()
|
|
|
|
|
|
def save_permanent_allowlist(patterns: set):
|
|
"""Save permanently allowed command patterns to config."""
|
|
try:
|
|
from hermes_cli.config import load_config, save_config
|
|
config = load_config()
|
|
config["command_allowlist"] = list(patterns)
|
|
save_config(config)
|
|
except Exception as e:
|
|
logger.warning("Could not save allowlist: %s", e)
|
|
|
|
|
|
# =========================================================================
|
|
# Approval prompting + orchestration
|
|
# =========================================================================
|
|
|
|
def prompt_dangerous_approval(command: str, description: str,
|
|
timeout_seconds: int = 60,
|
|
allow_permanent: bool = True,
|
|
approval_callback=None) -> str:
|
|
"""Prompt the user to approve a dangerous command (CLI only).
|
|
|
|
Args:
|
|
allow_permanent: When False, hide the [a]lways option (used when
|
|
tirith warnings are present, since broad permanent allowlisting
|
|
is inappropriate for content-level security findings).
|
|
approval_callback: Optional callback registered by the CLI for
|
|
prompt_toolkit integration. Signature:
|
|
(command, description, *, allow_permanent=True) -> str.
|
|
|
|
Returns: 'once', 'session', 'always', or 'deny'
|
|
"""
|
|
if approval_callback is not None:
|
|
try:
|
|
return approval_callback(command, description,
|
|
allow_permanent=allow_permanent)
|
|
except Exception:
|
|
return "deny"
|
|
|
|
os.environ["HERMES_SPINNER_PAUSE"] = "1"
|
|
try:
|
|
while True:
|
|
print()
|
|
print(f" ⚠️ DANGEROUS COMMAND: {description}")
|
|
print(f" {command}")
|
|
print()
|
|
if allow_permanent:
|
|
print(" [o]nce | [s]ession | [a]lways | [d]eny")
|
|
else:
|
|
print(" [o]nce | [s]ession | [d]eny")
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
result = {"choice": ""}
|
|
|
|
def get_input():
|
|
try:
|
|
prompt = " Choice [o/s/a/D]: " if allow_permanent else " Choice [o/s/D]: "
|
|
result["choice"] = input(prompt).strip().lower()
|
|
except (EOFError, OSError):
|
|
result["choice"] = ""
|
|
|
|
thread = threading.Thread(target=get_input, daemon=True)
|
|
thread.start()
|
|
thread.join(timeout=timeout_seconds)
|
|
|
|
if thread.is_alive():
|
|
print("\n ⏱ Timeout - denying command")
|
|
return "deny"
|
|
|
|
choice = result["choice"]
|
|
if choice in ('o', 'once'):
|
|
print(" ✓ Allowed once")
|
|
return "once"
|
|
elif choice in ('s', 'session'):
|
|
print(" ✓ Allowed for this session")
|
|
return "session"
|
|
elif choice in ('a', 'always'):
|
|
if not allow_permanent:
|
|
print(" ✓ Allowed for this session")
|
|
return "session"
|
|
print(" ✓ Added to permanent allowlist")
|
|
return "always"
|
|
else:
|
|
print(" ✗ Denied")
|
|
return "deny"
|
|
|
|
except (EOFError, KeyboardInterrupt):
|
|
print("\n ✗ Cancelled")
|
|
return "deny"
|
|
finally:
|
|
if "HERMES_SPINNER_PAUSE" in os.environ:
|
|
del os.environ["HERMES_SPINNER_PAUSE"]
|
|
print()
|
|
sys.stdout.flush()
|
|
|
|
|
|
def _normalize_approval_mode(mode) -> str:
|
|
"""Normalize approval mode values loaded from YAML/config.
|
|
|
|
YAML 1.1 treats bare words like `off` as booleans, so a config entry like
|
|
`approvals:\n mode: off` is parsed as False unless quoted. Treat that as the
|
|
intended string mode instead of falling back to manual approvals.
|
|
"""
|
|
if isinstance(mode, bool):
|
|
return "off" if mode is False else "manual"
|
|
if isinstance(mode, str):
|
|
normalized = mode.strip().lower()
|
|
return normalized or "manual"
|
|
return "manual"
|
|
|
|
|
|
def _get_approval_mode() -> str:
|
|
"""Read the approval mode from config. Returns 'manual', 'smart', or 'off'."""
|
|
try:
|
|
from hermes_cli.config import load_config
|
|
config = load_config()
|
|
mode = config.get("approvals", {}).get("mode", "manual")
|
|
return _normalize_approval_mode(mode)
|
|
except Exception:
|
|
return "manual"
|
|
|
|
|
|
def _smart_approve(command: str, description: str) -> str:
|
|
"""Use the auxiliary LLM to assess risk and decide approval.
|
|
|
|
Returns 'approve' if the LLM determines the command is safe,
|
|
'deny' if genuinely dangerous, or 'escalate' if uncertain.
|
|
|
|
Inspired by OpenAI Codex's Smart Approvals guardian subagent
|
|
(openai/codex#13860).
|
|
"""
|
|
try:
|
|
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
|
|
|
|
client, model = get_text_auxiliary_client(task="approval")
|
|
if not client or not model:
|
|
logger.debug("Smart approvals: no aux client available, escalating")
|
|
return "escalate"
|
|
|
|
prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous.
|
|
|
|
Command: {command}
|
|
Flagged reason: {description}
|
|
|
|
Assess the ACTUAL risk of this command. Many flagged commands are false positives — for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless.
|
|
|
|
Rules:
|
|
- APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.)
|
|
- DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.)
|
|
- ESCALATE if you're uncertain
|
|
|
|
Respond with exactly one word: APPROVE, DENY, or ESCALATE"""
|
|
|
|
response = client.chat.completions.create(
|
|
model=model,
|
|
messages=[{"role": "user", "content": prompt}],
|
|
**auxiliary_max_tokens_param(16),
|
|
temperature=0,
|
|
)
|
|
|
|
answer = (response.choices[0].message.content or "").strip().upper()
|
|
|
|
if "APPROVE" in answer:
|
|
return "approve"
|
|
elif "DENY" in answer:
|
|
return "deny"
|
|
else:
|
|
return "escalate"
|
|
|
|
except Exception as e:
|
|
logger.debug("Smart approvals: LLM call failed (%s), escalating", e)
|
|
return "escalate"
|
|
|
|
|
|
def check_dangerous_command(command: str, env_type: str,
|
|
approval_callback=None) -> dict:
|
|
"""Check if a command is dangerous and handle approval.
|
|
|
|
This is the main entry point called by terminal_tool before executing
|
|
any command. It orchestrates detection, session checks, and prompting.
|
|
|
|
Args:
|
|
command: The shell command to check.
|
|
env_type: Terminal backend type ('local', 'ssh', 'docker', etc.).
|
|
approval_callback: Optional CLI callback for interactive prompts.
|
|
|
|
Returns:
|
|
{"approved": True/False, "message": str or None, ...}
|
|
"""
|
|
if env_type in ("docker", "singularity", "modal", "daytona"):
|
|
return {"approved": True, "message": None}
|
|
|
|
# --yolo: bypass all approval prompts
|
|
if os.getenv("HERMES_YOLO_MODE"):
|
|
return {"approved": True, "message": None}
|
|
|
|
is_dangerous, pattern_key, description = detect_dangerous_command(command)
|
|
if not is_dangerous:
|
|
return {"approved": True, "message": None}
|
|
|
|
session_key = os.getenv("HERMES_SESSION_KEY", "default")
|
|
if is_approved(session_key, pattern_key):
|
|
return {"approved": True, "message": None}
|
|
|
|
is_cli = os.getenv("HERMES_INTERACTIVE")
|
|
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
|
|
|
if not is_cli and not is_gateway:
|
|
return {"approved": True, "message": None}
|
|
|
|
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
|
|
submit_pending(session_key, {
|
|
"command": command,
|
|
"pattern_key": pattern_key,
|
|
"description": description,
|
|
})
|
|
return {
|
|
"approved": False,
|
|
"pattern_key": pattern_key,
|
|
"status": "approval_required",
|
|
"command": command,
|
|
"description": description,
|
|
"message": (
|
|
f"⚠️ This command is potentially dangerous ({description}). "
|
|
f"Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
|
|
),
|
|
}
|
|
|
|
choice = prompt_dangerous_approval(command, description,
|
|
approval_callback=approval_callback)
|
|
|
|
if choice == "deny":
|
|
return {
|
|
"approved": False,
|
|
"message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.",
|
|
"pattern_key": pattern_key,
|
|
"description": description,
|
|
}
|
|
|
|
if choice == "session":
|
|
approve_session(session_key, pattern_key)
|
|
elif choice == "always":
|
|
approve_session(session_key, pattern_key)
|
|
approve_permanent(pattern_key)
|
|
save_permanent_allowlist(_permanent_approved)
|
|
|
|
return {"approved": True, "message": None}
|
|
|
|
|
|
# =========================================================================
|
|
# Combined pre-exec guard (tirith + dangerous command detection)
|
|
# =========================================================================
|
|
|
|
def _format_tirith_description(tirith_result: dict) -> str:
|
|
"""Build a human-readable description from tirith findings.
|
|
|
|
Includes severity, title, and description for each finding so users
|
|
can make an informed approval decision.
|
|
"""
|
|
findings = tirith_result.get("findings") or []
|
|
if not findings:
|
|
summary = tirith_result.get("summary") or "security issue detected"
|
|
return f"Security scan: {summary}"
|
|
|
|
parts = []
|
|
for f in findings:
|
|
severity = f.get("severity", "")
|
|
title = f.get("title", "")
|
|
desc = f.get("description", "")
|
|
if title and desc:
|
|
parts.append(f"[{severity}] {title}: {desc}" if severity else f"{title}: {desc}")
|
|
elif title:
|
|
parts.append(f"[{severity}] {title}" if severity else title)
|
|
if not parts:
|
|
summary = tirith_result.get("summary") or "security issue detected"
|
|
return f"Security scan: {summary}"
|
|
|
|
return "Security scan — " + "; ".join(parts)
|
|
|
|
|
|
def check_all_command_guards(command: str, env_type: str,
|
|
approval_callback=None) -> dict:
|
|
"""Run all pre-exec security checks and return a single approval decision.
|
|
|
|
Gathers findings from tirith and dangerous-command detection, then
|
|
presents them as a single combined approval request. This prevents
|
|
a gateway force=True replay from bypassing one check when only the
|
|
other was shown to the user.
|
|
"""
|
|
# Skip containers for both checks
|
|
if env_type in ("docker", "singularity", "modal", "daytona"):
|
|
return {"approved": True, "message": None}
|
|
|
|
# --yolo or approvals.mode=off: bypass all approval prompts
|
|
approval_mode = _get_approval_mode()
|
|
if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off":
|
|
return {"approved": True, "message": None}
|
|
|
|
is_cli = os.getenv("HERMES_INTERACTIVE")
|
|
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
|
|
is_ask = os.getenv("HERMES_EXEC_ASK")
|
|
|
|
# Preserve the existing non-interactive behavior: outside CLI/gateway/ask
|
|
# flows, we do not block on approvals and we skip external guard work.
|
|
if not is_cli and not is_gateway and not is_ask:
|
|
return {"approved": True, "message": None}
|
|
|
|
# --- Phase 1: Gather findings from both checks ---
|
|
|
|
# Tirith check — wrapper guarantees no raise for expected failures.
|
|
# Only catch ImportError (module not installed).
|
|
tirith_result = {"action": "allow", "findings": [], "summary": ""}
|
|
try:
|
|
from tools.tirith_security import check_command_security
|
|
tirith_result = check_command_security(command)
|
|
except ImportError:
|
|
pass # tirith module not installed — allow
|
|
|
|
# Dangerous command check (detection only, no approval)
|
|
is_dangerous, pattern_key, description = detect_dangerous_command(command)
|
|
|
|
# --- Phase 2: Decide ---
|
|
|
|
# Collect warnings that need approval
|
|
warnings = [] # list of (pattern_key, description, is_tirith)
|
|
|
|
session_key = os.getenv("HERMES_SESSION_KEY", "default")
|
|
|
|
# Tirith block/warn → approvable warning with rich findings.
|
|
# Previously, tirith "block" was a hard block with no approval prompt.
|
|
# Now both block and warn go through the approval flow so users can
|
|
# inspect the explanation and approve if they understand the risk.
|
|
if tirith_result["action"] in ("block", "warn"):
|
|
findings = tirith_result.get("findings") or []
|
|
rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown"
|
|
tirith_key = f"tirith:{rule_id}"
|
|
tirith_desc = _format_tirith_description(tirith_result)
|
|
if not is_approved(session_key, tirith_key):
|
|
warnings.append((tirith_key, tirith_desc, True))
|
|
|
|
if is_dangerous:
|
|
if not is_approved(session_key, pattern_key):
|
|
warnings.append((pattern_key, description, False))
|
|
|
|
# Nothing to warn about
|
|
if not warnings:
|
|
return {"approved": True, "message": None}
|
|
|
|
# --- Phase 2.5: Smart approval (auxiliary LLM risk assessment) ---
|
|
# When approvals.mode=smart, ask the aux LLM before prompting the user.
|
|
# Inspired by OpenAI Codex's Smart Approvals guardian subagent
|
|
# (openai/codex#13860).
|
|
if approval_mode == "smart":
|
|
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
|
|
verdict = _smart_approve(command, combined_desc_for_llm)
|
|
if verdict == "approve":
|
|
# Auto-approve and grant session-level approval for these patterns
|
|
for key, _, _ in warnings:
|
|
approve_session(session_key, key)
|
|
logger.debug("Smart approval: auto-approved '%s' (%s)",
|
|
command[:60], combined_desc_for_llm)
|
|
return {"approved": True, "message": None,
|
|
"smart_approved": True}
|
|
elif verdict == "deny":
|
|
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
|
|
return {
|
|
"approved": False,
|
|
"message": f"BLOCKED by smart approval: {combined_desc_for_llm}. "
|
|
"The command was assessed as genuinely dangerous. Do NOT retry.",
|
|
"smart_denied": True,
|
|
}
|
|
# verdict == "escalate" → fall through to manual prompt
|
|
|
|
# --- Phase 3: Approval ---
|
|
|
|
# Combine descriptions for a single approval prompt
|
|
combined_desc = "; ".join(desc for _, desc, _ in warnings)
|
|
primary_key = warnings[0][0]
|
|
all_keys = [key for key, _, _ in warnings]
|
|
has_tirith = any(is_t for _, _, is_t in warnings)
|
|
|
|
# Gateway/async: single approval_required with combined description
|
|
# Store all pattern keys so gateway replay approves all of them
|
|
if is_gateway or is_ask:
|
|
submit_pending(session_key, {
|
|
"command": command,
|
|
"pattern_key": primary_key, # backward compat
|
|
"pattern_keys": all_keys, # all keys for replay
|
|
"description": combined_desc,
|
|
})
|
|
return {
|
|
"approved": False,
|
|
"pattern_key": primary_key,
|
|
"status": "approval_required",
|
|
"command": command,
|
|
"description": combined_desc,
|
|
"message": (
|
|
f"⚠️ {combined_desc}. Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
|
|
),
|
|
}
|
|
|
|
# CLI interactive: single combined prompt
|
|
# Hide [a]lways when any tirith warning is present
|
|
choice = prompt_dangerous_approval(command, combined_desc,
|
|
allow_permanent=not has_tirith,
|
|
approval_callback=approval_callback)
|
|
|
|
if choice == "deny":
|
|
return {
|
|
"approved": False,
|
|
"message": "BLOCKED: User denied. Do NOT retry.",
|
|
"pattern_key": primary_key,
|
|
"description": combined_desc,
|
|
}
|
|
|
|
# Persist approval for each warning individually
|
|
for key, _, is_tirith in warnings:
|
|
if choice == "session" or (choice == "always" and is_tirith):
|
|
# tirith: session only (no permanent broad allowlisting)
|
|
approve_session(session_key, key)
|
|
elif choice == "always":
|
|
# dangerous patterns: permanent allowed
|
|
approve_session(session_key, key)
|
|
approve_permanent(key)
|
|
save_permanent_allowlist(_permanent_approved)
|
|
|
|
return {"approved": True, "message": None}
|