Files
hermes-agent/tools/approval.py
Teknium 4cb6735541 fix(approval): show full command in dangerous command approval (#1553)
* fix: prevent infinite 400 failure loop on context overflow (#1630)

When a gateway session exceeds the model's context window, Anthropic may
return a generic 400 invalid_request_error with just 'Error' as the
message.  This bypassed the phrase-based context-length detection,
causing the agent to treat it as a non-retryable client error.  Worse,
the failed user message was still persisted to the transcript, making
the session even larger on each attempt — creating an infinite loop.

Three-layer fix:

1. run_agent.py — Fallback heuristic: when a 400 error has a very short
   generic message AND the session is large (>40% of context or >80
   messages), treat it as a probable context overflow and trigger
   compression instead of aborting.

2. run_agent.py + gateway/run.py — Don't persist failed messages:
   when the agent returns failed=True before generating any response,
   skip writing the user's message to the transcript/DB. This prevents
   the session from growing on each failure.

3. gateway/run.py — Smarter error messages: detect context-overflow
   failures and suggest /compact or /reset specifically, instead of a
   generic 'try again' that will fail identically.

* fix(skills): detect prompt injection patterns and block cache file reads

Adds two security layers to prevent prompt injection via skills hub
cache files (#1558):

1. read_file: blocks direct reads of ~/.hermes/skills/.hub/ directory
   (index-cache, catalog files). The 3.5MB clawhub_catalog_v1.json
   was the original injection vector — untrusted skill descriptions
   in the catalog contained adversarial text that the model executed.

2. skill_view: warns when skills are loaded from outside the trusted
   ~/.hermes/skills/ directory, and detects common injection patterns
   in skill content ("ignore previous instructions", "<system>", etc.).

Cherry-picked from PR #1562 by ygd58.

* fix(tools): chunk long messages in send_message_tool before dispatch (#1552)

Long messages sent via send_message tool or cron delivery silently
failed when exceeding platform limits. Gateway adapters handle this
via truncate_message(), but the standalone senders in send_message_tool
bypassed that entirely.

- Apply truncate_message() chunking in _send_to_platform() before
  dispatching to individual platform senders
- Remove naive message[i:i+2000] character split in _send_discord()
  in favor of centralized smart splitting
- Attach media files to last chunk only for Telegram
- Add regression tests for chunking and media placement

Cherry-picked from PR #1557 by llbn.

* fix(approval): show full command in dangerous command approval (#1553)

Previously the command was truncated to 80 chars in CLI (with a
[v]iew full option), 500 chars in Discord embeds, and missing entirely
in Telegram/Slack approval messages. Now the full command is always
displayed everywhere:

- CLI: removed 80-char truncation and [v]iew full menu option
- Gateway (TG/Slack): approval_required message includes full command
  in a code block
- Discord: embed shows full command up to 4096-char limit
- Windows: skip SIGALRM-based test timeout (Unix-only)
- Updated tests: replaced view-flow tests with direct approval tests

Cherry-picked from PR #1566 by crazywriter1.

---------

Co-authored-by: buray <ygd58@users.noreply.github.com>
Co-authored-by: lbn <llbn@users.noreply.github.com>
Co-authored-by: crazywriter1 <53251494+crazywriter1@users.noreply.github.com>
2026-03-17 02:02:33 -07:00

571 lines
22 KiB
Python

"""Dangerous command approval -- detection, prompting, and per-session state.
This module is the single source of truth for the dangerous command system:
- Pattern detection (DANGEROUS_PATTERNS, detect_dangerous_command)
- Per-session approval state (thread-safe, keyed by session_key)
- Approval prompting (CLI interactive + gateway async)
- Smart approval via auxiliary LLM (auto-approve low-risk commands)
- Permanent allowlist persistence (config.yaml)
"""
import logging
import os
import re
import sys
import threading
from typing import Optional
logger = logging.getLogger(__name__)
# =========================================================================
# Dangerous command patterns
# =========================================================================
DANGEROUS_PATTERNS = [
(r'\brm\s+(-[^\s]*\s+)*/', "delete in root path"),
(r'\brm\s+-[^\s]*r', "recursive delete"),
(r'\brm\s+--recursive\b', "recursive delete (long flag)"),
(r'\bchmod\s+(-[^\s]*\s+)*777\b', "world-writable permissions"),
(r'\bchmod\s+--recursive\b.*777', "recursive world-writable (long flag)"),
(r'\bchown\s+(-[^\s]*)?R\s+root', "recursive chown to root"),
(r'\bchown\s+--recursive\b.*root', "recursive chown to root (long flag)"),
(r'\bmkfs\b', "format filesystem"),
(r'\bdd\s+.*if=', "disk copy"),
(r'>\s*/dev/sd', "write to block device"),
(r'\bDROP\s+(TABLE|DATABASE)\b', "SQL DROP"),
(r'\bDELETE\s+FROM\b(?!.*\bWHERE\b)', "SQL DELETE without WHERE"),
(r'\bTRUNCATE\s+(TABLE)?\s*\w', "SQL TRUNCATE"),
(r'>\s*/etc/', "overwrite system config"),
(r'\bsystemctl\s+(stop|disable|mask)\b', "stop/disable system service"),
(r'\bkill\s+-9\s+-1\b', "kill all processes"),
(r'\bpkill\s+-9\b', "force kill processes"),
(r':\(\)\s*\{\s*:\s*\|\s*:\s*&\s*\}\s*;\s*:', "fork bomb"),
(r'\b(bash|sh|zsh)\s+-c\s+', "shell command via -c flag"),
(r'\b(python[23]?|perl|ruby|node)\s+-[ec]\s+', "script execution via -e/-c flag"),
(r'\b(curl|wget)\b.*\|\s*(ba)?sh\b', "pipe remote content to shell"),
(r'\b(bash|sh|zsh|ksh)\s+<\s*<?\s*\(\s*(curl|wget)\b', "execute remote script via process substitution"),
(r'\btee\b.*(/etc/|/dev/sd|\.ssh/|\.hermes/\.env)', "overwrite system file via tee"),
(r'\bxargs\s+.*\brm\b', "xargs with rm"),
(r'\bfind\b.*-exec\s+(/\S*/)?rm\b', "find -exec rm"),
(r'\bfind\b.*-delete\b', "find -delete"),
]
def _legacy_pattern_key(pattern: str) -> str:
"""Reproduce the old regex-derived approval key for backwards compatibility."""
return pattern.split(r'\b')[1] if r'\b' in pattern else pattern[:20]
_PATTERN_KEY_ALIASES: dict[str, set[str]] = {}
for _pattern, _description in DANGEROUS_PATTERNS:
_legacy_key = _legacy_pattern_key(_pattern)
_canonical_key = _description
_PATTERN_KEY_ALIASES.setdefault(_canonical_key, set()).update({_canonical_key, _legacy_key})
_PATTERN_KEY_ALIASES.setdefault(_legacy_key, set()).update({_legacy_key, _canonical_key})
def _approval_key_aliases(pattern_key: str) -> set[str]:
"""Return all approval keys that should match this pattern.
New approvals use the human-readable description string, but older
command_allowlist entries and session approvals may still contain the
historical regex-derived key.
"""
return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
# =========================================================================
# Detection
# =========================================================================
def detect_dangerous_command(command: str) -> tuple:
"""Check if a command matches any dangerous patterns.
Returns:
(is_dangerous, pattern_key, description) or (False, None, None)
"""
command_lower = command.lower()
for pattern, description in DANGEROUS_PATTERNS:
if re.search(pattern, command_lower, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
# =========================================================================
# Per-session approval state (thread-safe)
# =========================================================================
_lock = threading.Lock()
_pending: dict[str, dict] = {}
_session_approved: dict[str, set] = {}
_permanent_approved: set = set()
def submit_pending(session_key: str, approval: dict):
"""Store a pending approval request for a session."""
with _lock:
_pending[session_key] = approval
def pop_pending(session_key: str) -> Optional[dict]:
"""Retrieve and remove a pending approval for a session."""
with _lock:
return _pending.pop(session_key, None)
def has_pending(session_key: str) -> bool:
"""Check if a session has a pending approval request."""
with _lock:
return session_key in _pending
def approve_session(session_key: str, pattern_key: str):
"""Approve a pattern for this session only."""
with _lock:
_session_approved.setdefault(session_key, set()).add(pattern_key)
def is_approved(session_key: str, pattern_key: str) -> bool:
"""Check if a pattern is approved (session-scoped or permanent).
Accept both the current canonical key and the legacy regex-derived key so
existing command_allowlist entries continue to work after key migrations.
"""
aliases = _approval_key_aliases(pattern_key)
with _lock:
if any(alias in _permanent_approved for alias in aliases):
return True
session_approvals = _session_approved.get(session_key, set())
return any(alias in session_approvals for alias in aliases)
def approve_permanent(pattern_key: str):
"""Add a pattern to the permanent allowlist."""
with _lock:
_permanent_approved.add(pattern_key)
def load_permanent(patterns: set):
"""Bulk-load permanent allowlist entries from config."""
with _lock:
_permanent_approved.update(patterns)
def clear_session(session_key: str):
"""Clear all approvals and pending requests for a session."""
with _lock:
_session_approved.pop(session_key, None)
_pending.pop(session_key, None)
# =========================================================================
# Config persistence for permanent allowlist
# =========================================================================
def load_permanent_allowlist() -> set:
"""Load permanently allowed command patterns from config.
Also syncs them into the approval module so is_approved() works for
patterns added via 'always' in a previous session.
"""
try:
from hermes_cli.config import load_config
config = load_config()
patterns = set(config.get("command_allowlist", []) or [])
if patterns:
load_permanent(patterns)
return patterns
except Exception:
return set()
def save_permanent_allowlist(patterns: set):
"""Save permanently allowed command patterns to config."""
try:
from hermes_cli.config import load_config, save_config
config = load_config()
config["command_allowlist"] = list(patterns)
save_config(config)
except Exception as e:
logger.warning("Could not save allowlist: %s", e)
# =========================================================================
# Approval prompting + orchestration
# =========================================================================
def prompt_dangerous_approval(command: str, description: str,
timeout_seconds: int = 60,
allow_permanent: bool = True,
approval_callback=None) -> str:
"""Prompt the user to approve a dangerous command (CLI only).
Args:
allow_permanent: When False, hide the [a]lways option (used when
tirith warnings are present, since broad permanent allowlisting
is inappropriate for content-level security findings).
approval_callback: Optional callback registered by the CLI for
prompt_toolkit integration. Signature:
(command, description, *, allow_permanent=True) -> str.
Returns: 'once', 'session', 'always', or 'deny'
"""
if approval_callback is not None:
try:
return approval_callback(command, description,
allow_permanent=allow_permanent)
except Exception:
return "deny"
os.environ["HERMES_SPINNER_PAUSE"] = "1"
try:
while True:
print()
print(f" ⚠️ DANGEROUS COMMAND: {description}")
print(f" {command}")
print()
if allow_permanent:
print(" [o]nce | [s]ession | [a]lways | [d]eny")
else:
print(" [o]nce | [s]ession | [d]eny")
print()
sys.stdout.flush()
result = {"choice": ""}
def get_input():
try:
prompt = " Choice [o/s/a/D]: " if allow_permanent else " Choice [o/s/D]: "
result["choice"] = input(prompt).strip().lower()
except (EOFError, OSError):
result["choice"] = ""
thread = threading.Thread(target=get_input, daemon=True)
thread.start()
thread.join(timeout=timeout_seconds)
if thread.is_alive():
print("\n ⏱ Timeout - denying command")
return "deny"
choice = result["choice"]
if choice in ('o', 'once'):
print(" ✓ Allowed once")
return "once"
elif choice in ('s', 'session'):
print(" ✓ Allowed for this session")
return "session"
elif choice in ('a', 'always'):
if not allow_permanent:
print(" ✓ Allowed for this session")
return "session"
print(" ✓ Added to permanent allowlist")
return "always"
else:
print(" ✗ Denied")
return "deny"
except (EOFError, KeyboardInterrupt):
print("\n ✗ Cancelled")
return "deny"
finally:
if "HERMES_SPINNER_PAUSE" in os.environ:
del os.environ["HERMES_SPINNER_PAUSE"]
print()
sys.stdout.flush()
def _get_approval_mode() -> str:
"""Read the approval mode from config. Returns 'manual', 'smart', or 'off'."""
try:
from hermes_cli.config import load_config
config = load_config()
return config.get("approvals", {}).get("mode", "manual")
except Exception:
return "manual"
def _smart_approve(command: str, description: str) -> str:
"""Use the auxiliary LLM to assess risk and decide approval.
Returns 'approve' if the LLM determines the command is safe,
'deny' if genuinely dangerous, or 'escalate' if uncertain.
Inspired by OpenAI Codex's Smart Approvals guardian subagent
(openai/codex#13860).
"""
try:
from agent.auxiliary_client import get_text_auxiliary_client, auxiliary_max_tokens_param
client, model = get_text_auxiliary_client(task="approval")
if not client or not model:
logger.debug("Smart approvals: no aux client available, escalating")
return "escalate"
prompt = f"""You are a security reviewer for an AI coding agent. A terminal command was flagged by pattern matching as potentially dangerous.
Command: {command}
Flagged reason: {description}
Assess the ACTUAL risk of this command. Many flagged commands are false positives — for example, `python -c "print('hello')"` is flagged as "script execution via -c flag" but is completely harmless.
Rules:
- APPROVE if the command is clearly safe (benign script execution, safe file operations, development tools, package installs, git operations, etc.)
- DENY if the command could genuinely damage the system (recursive delete of important paths, overwriting system files, fork bombs, wiping disks, dropping databases, etc.)
- ESCALATE if you're uncertain
Respond with exactly one word: APPROVE, DENY, or ESCALATE"""
response = client.chat.completions.create(
model=model,
messages=[{"role": "user", "content": prompt}],
**auxiliary_max_tokens_param(16),
temperature=0,
)
answer = (response.choices[0].message.content or "").strip().upper()
if "APPROVE" in answer:
return "approve"
elif "DENY" in answer:
return "deny"
else:
return "escalate"
except Exception as e:
logger.debug("Smart approvals: LLM call failed (%s), escalating", e)
return "escalate"
def check_dangerous_command(command: str, env_type: str,
approval_callback=None) -> dict:
"""Check if a command is dangerous and handle approval.
This is the main entry point called by terminal_tool before executing
any command. It orchestrates detection, session checks, and prompting.
Args:
command: The shell command to check.
env_type: Terminal backend type ('local', 'ssh', 'docker', etc.).
approval_callback: Optional CLI callback for interactive prompts.
Returns:
{"approved": True/False, "message": str or None, ...}
"""
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts
if os.getenv("HERMES_YOLO_MODE"):
return {"approved": True, "message": None}
is_dangerous, pattern_key, description = detect_dangerous_command(command)
if not is_dangerous:
return {"approved": True, "message": None}
session_key = os.getenv("HERMES_SESSION_KEY", "default")
if is_approved(session_key, pattern_key):
return {"approved": True, "message": None}
is_cli = os.getenv("HERMES_INTERACTIVE")
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
if not is_cli and not is_gateway:
return {"approved": True, "message": None}
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
submit_pending(session_key, {
"command": command,
"pattern_key": pattern_key,
"description": description,
})
return {
"approved": False,
"pattern_key": pattern_key,
"status": "approval_required",
"command": command,
"description": description,
"message": (
f"⚠️ This command is potentially dangerous ({description}). "
f"Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
),
}
choice = prompt_dangerous_approval(command, description,
approval_callback=approval_callback)
if choice == "deny":
return {
"approved": False,
"message": f"BLOCKED: User denied this potentially dangerous command (matched '{description}' pattern). Do NOT retry this command - the user has explicitly rejected it.",
"pattern_key": pattern_key,
"description": description,
}
if choice == "session":
approve_session(session_key, pattern_key)
elif choice == "always":
approve_session(session_key, pattern_key)
approve_permanent(pattern_key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None}
# =========================================================================
# Combined pre-exec guard (tirith + dangerous command detection)
# =========================================================================
def check_all_command_guards(command: str, env_type: str,
approval_callback=None) -> dict:
"""Run all pre-exec security checks and return a single approval decision.
Gathers findings from tirith and dangerous-command detection, then
presents them as a single combined approval request. This prevents
a gateway force=True replay from bypassing one check when only the
other was shown to the user.
"""
# Skip containers for both checks
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo or approvals.mode=off: bypass all approval prompts
approval_mode = _get_approval_mode()
if os.getenv("HERMES_YOLO_MODE") or approval_mode == "off":
return {"approved": True, "message": None}
is_cli = os.getenv("HERMES_INTERACTIVE")
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
is_ask = os.getenv("HERMES_EXEC_ASK")
# Preserve the existing non-interactive behavior: outside CLI/gateway/ask
# flows, we do not block on approvals and we skip external guard work.
if not is_cli and not is_gateway and not is_ask:
return {"approved": True, "message": None}
# --- Phase 1: Gather findings from both checks ---
# Tirith check — wrapper guarantees no raise for expected failures.
# Only catch ImportError (module not installed).
tirith_result = {"action": "allow", "findings": [], "summary": ""}
try:
from tools.tirith_security import check_command_security
tirith_result = check_command_security(command)
except ImportError:
pass # tirith module not installed — allow
# Dangerous command check (detection only, no approval)
is_dangerous, pattern_key, description = detect_dangerous_command(command)
# --- Phase 2: Decide ---
# If tirith blocks, block immediately (no approval possible)
if tirith_result["action"] == "block":
summary = tirith_result.get("summary") or "security issue detected"
return {
"approved": False,
"message": f"BLOCKED: Command blocked by security scan ({summary}). Do NOT retry.",
}
# Collect warnings that need approval
warnings = [] # list of (pattern_key, description, is_tirith)
session_key = os.getenv("HERMES_SESSION_KEY", "default")
if tirith_result["action"] == "warn":
findings = tirith_result.get("findings") or []
rule_id = findings[0].get("rule_id", "unknown") if findings else "unknown"
tirith_key = f"tirith:{rule_id}"
tirith_desc = f"Security scan: {tirith_result.get('summary') or 'security warning detected'}"
if not is_approved(session_key, tirith_key):
warnings.append((tirith_key, tirith_desc, True))
if is_dangerous:
if not is_approved(session_key, pattern_key):
warnings.append((pattern_key, description, False))
# Nothing to warn about
if not warnings:
return {"approved": True, "message": None}
# --- Phase 2.5: Smart approval (auxiliary LLM risk assessment) ---
# When approvals.mode=smart, ask the aux LLM before prompting the user.
# Inspired by OpenAI Codex's Smart Approvals guardian subagent
# (openai/codex#13860).
if approval_mode == "smart":
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
verdict = _smart_approve(command, combined_desc_for_llm)
if verdict == "approve":
# Auto-approve and grant session-level approval for these patterns
for key, _, _ in warnings:
approve_session(session_key, key)
logger.debug("Smart approval: auto-approved '%s' (%s)",
command[:60], combined_desc_for_llm)
return {"approved": True, "message": None,
"smart_approved": True}
elif verdict == "deny":
combined_desc_for_llm = "; ".join(desc for _, desc, _ in warnings)
return {
"approved": False,
"message": f"BLOCKED by smart approval: {combined_desc_for_llm}. "
"The command was assessed as genuinely dangerous. Do NOT retry.",
"smart_denied": True,
}
# verdict == "escalate" → fall through to manual prompt
# --- Phase 3: Approval ---
# Combine descriptions for a single approval prompt
combined_desc = "; ".join(desc for _, desc, _ in warnings)
primary_key = warnings[0][0]
all_keys = [key for key, _, _ in warnings]
has_tirith = any(is_t for _, _, is_t in warnings)
# Gateway/async: single approval_required with combined description
# Store all pattern keys so gateway replay approves all of them
if is_gateway or is_ask:
submit_pending(session_key, {
"command": command,
"pattern_key": primary_key, # backward compat
"pattern_keys": all_keys, # all keys for replay
"description": combined_desc,
})
return {
"approved": False,
"pattern_key": primary_key,
"status": "approval_required",
"command": command,
"description": combined_desc,
"message": (
f"⚠️ {combined_desc}. Asking the user for approval.\n\n**Command:**\n```\n{command}\n```"
),
}
# CLI interactive: single combined prompt
# Hide [a]lways when any tirith warning is present
choice = prompt_dangerous_approval(command, combined_desc,
allow_permanent=not has_tirith,
approval_callback=approval_callback)
if choice == "deny":
return {
"approved": False,
"message": "BLOCKED: User denied. Do NOT retry.",
"pattern_key": primary_key,
"description": combined_desc,
}
# Persist approval for each warning individually
for key, _, is_tirith in warnings:
if choice == "session" or (choice == "always" and is_tirith):
# tirith: session only (no permanent broad allowlisting)
approve_session(session_key, key)
elif choice == "always":
# dangerous patterns: permanent allowed
approve_session(session_key, key)
approve_permanent(key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None}