From 95b6bd5df62bfa4e343e83018f26189dc18040d8 Mon Sep 17 00:00:00 2001 From: Raeli Savitt Date: Wed, 25 Feb 2026 23:43:15 -0500 Subject: [PATCH] Harden agent attack surface: scan writes to memory, skills, cron, and context files MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The security scanner (skills_guard.py) was only wired into the hub install path. All other write paths to persistent state — skills created by the agent, memory entries, cron prompts, and context files — bypassed it entirely. This closes those gaps: - file_operations: deny-list blocks writes to ~/.ssh, ~/.aws, ~/.hermes/.env, etc. - code_execution_tool: filter secret env vars from sandbox child process - skill_manager_tool: wire scan_skill() into create/edit/patch/write_file with rollback - skills_guard: add "agent-created" trust level (same policy as community) - memory_tool: scan content for injection/exfil before system prompt injection - prompt_builder: scan AGENTS.md, .cursorrules, SOUL.md for prompt injection - cronjob_tools: scan cron prompts for critical threats before scheduling Co-Authored-By: Claude Opus 4.6 --- agent/prompt_builder.py | 48 +++++++++++++++++++++++++ tools/code_execution_tool.py | 9 ++++- tools/cronjob_tools.py | 41 +++++++++++++++++++++ tools/file_operations.py | 69 ++++++++++++++++++++++++++++++++---- tools/memory_tool.py | 59 ++++++++++++++++++++++++++++++ tools/skill_manager_tool.py | 59 ++++++++++++++++++++++++++++++ tools/skills_guard.py | 1 + 7 files changed, 278 insertions(+), 8 deletions(-) diff --git a/agent/prompt_builder.py b/agent/prompt_builder.py index 49395d9fd..24c26ef86 100644 --- a/agent/prompt_builder.py +++ b/agent/prompt_builder.py @@ -12,6 +12,50 @@ from typing import Optional logger = logging.getLogger(__name__) +# --------------------------------------------------------------------------- +# Context file scanning — detect prompt injection in AGENTS.md, .cursorrules, +# SOUL.md before they get injected into the system prompt. +# --------------------------------------------------------------------------- + +_CONTEXT_THREAT_PATTERNS = [ + (r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"), + (r'do\s+not\s+tell\s+the\s+user', "deception_hide"), + (r'system\s+prompt\s+override', "sys_prompt_override"), + (r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"), + (r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"), + (r'', "html_comment_injection"), + (r'<\s*div\s+style\s*=\s*["\'].*display\s*:\s*none', "hidden_div"), + (r'translate\s+.*\s+into\s+.*\s+and\s+(execute|run|eval)', "translate_execute"), + (r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"), + (r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"), +] + +_CONTEXT_INVISIBLE_CHARS = { + '\u200b', '\u200c', '\u200d', '\u2060', '\ufeff', + '\u202a', '\u202b', '\u202c', '\u202d', '\u202e', +} + + +def _scan_context_content(content: str, filename: str) -> str: + """Scan context file content for injection. Returns sanitized content.""" + findings = [] + + # Check invisible unicode + for char in _CONTEXT_INVISIBLE_CHARS: + if char in content: + findings.append(f"invisible unicode U+{ord(char):04X}") + + # Check threat patterns + for pattern, pid in _CONTEXT_THREAT_PATTERNS: + if re.search(pattern, content, re.IGNORECASE): + findings.append(pid) + + if findings: + logger.warning("Context file %s blocked: %s", filename, ", ".join(findings)) + return f"[BLOCKED: {filename} contained potential prompt injection ({', '.join(findings)}). Content not loaded.]" + + return content + # ========================================================================= # Constants # ========================================================================= @@ -215,6 +259,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str: content = agents_path.read_text(encoding="utf-8").strip() if content: rel_path = agents_path.relative_to(cwd_path) + content = _scan_context_content(content, str(rel_path)) total_agents_content += f"## {rel_path}\n\n{content}\n\n" except Exception as e: logger.debug("Could not read %s: %s", agents_path, e) @@ -230,6 +275,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str: try: content = cursorrules_file.read_text(encoding="utf-8").strip() if content: + content = _scan_context_content(content, ".cursorrules") cursorrules_content += f"## .cursorrules\n\n{content}\n\n" except Exception as e: logger.debug("Could not read .cursorrules: %s", e) @@ -241,6 +287,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str: try: content = mdc_file.read_text(encoding="utf-8").strip() if content: + content = _scan_context_content(content, f".cursor/rules/{mdc_file.name}") cursorrules_content += f"## .cursor/rules/{mdc_file.name}\n\n{content}\n\n" except Exception as e: logger.debug("Could not read %s: %s", mdc_file, e) @@ -265,6 +312,7 @@ def build_context_files_prompt(cwd: Optional[str] = None) -> str: try: content = soul_path.read_text(encoding="utf-8").strip() if content: + content = _scan_context_content(content, "SOUL.md") content = _truncate_content(content, "SOUL.md") sections.append( f"## SOUL.md\n\nIf SOUL.md is present, embody its persona and tone. " diff --git a/tools/code_execution_tool.py b/tools/code_execution_tool.py index c58951fd6..130ee6f48 100644 --- a/tools/code_execution_tool.py +++ b/tools/code_execution_tool.py @@ -381,7 +381,14 @@ def execute_code( rpc_thread.start() # --- Spawn child process --- - child_env = os.environ.copy() + # Filter out secret env vars to prevent exfiltration from sandbox + _SECRET_PATTERNS = ("KEY", "TOKEN", "SECRET", "PASSWORD", "CREDENTIAL", + "API_KEY", "OPENROUTER", "ANTHROPIC", "OPENAI", + "AWS_SECRET", "GITHUB_TOKEN") + child_env = { + k: v for k, v in os.environ.items() + if not any(pat in k.upper() for pat in _SECRET_PATTERNS) + } child_env["HERMES_RPC_SOCKET"] = sock_path child_env["PYTHONDONTWRITEBYTECODE"] = "1" diff --git a/tools/cronjob_tools.py b/tools/cronjob_tools.py index e8cde43bf..91d9a07da 100644 --- a/tools/cronjob_tools.py +++ b/tools/cronjob_tools.py @@ -10,6 +10,7 @@ The prompt must contain ALL necessary information. import json import os +import re from typing import Optional # Import from cron module (will be available when properly installed) @@ -20,6 +21,41 @@ sys.path.insert(0, str(Path(__file__).parent.parent)) from cron.jobs import create_job, get_job, list_jobs, remove_job +# --------------------------------------------------------------------------- +# Cron prompt scanning — critical-severity patterns only, since cron prompts +# run in fresh sessions with full tool access. +# --------------------------------------------------------------------------- + +_CRON_THREAT_PATTERNS = [ + (r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"), + (r'do\s+not\s+tell\s+the\s+user', "deception_hide"), + (r'system\s+prompt\s+override', "sys_prompt_override"), + (r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"), + (r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"), + (r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"), + (r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass)', "read_secrets"), + (r'authorized_keys', "ssh_backdoor"), + (r'/etc/sudoers|visudo', "sudoers_mod"), + (r'rm\s+-rf\s+/', "destructive_root_rm"), +] + +_CRON_INVISIBLE_CHARS = { + '\u200b', '\u200c', '\u200d', '\u2060', '\ufeff', + '\u202a', '\u202b', '\u202c', '\u202d', '\u202e', +} + + +def _scan_cron_prompt(prompt: str) -> str: + """Scan a cron prompt for critical threats. Returns error string if blocked, else empty.""" + for char in _CRON_INVISIBLE_CHARS: + if char in prompt: + return f"Blocked: prompt contains invisible unicode U+{ord(char):04X} (possible injection)." + for pattern, pid in _CRON_THREAT_PATTERNS: + if re.search(pattern, prompt, re.IGNORECASE): + return f"Blocked: prompt matches threat pattern '{pid}'. Cron prompts must not contain injection or exfiltration payloads." + return "" + + # ============================================================================= # Tool: schedule_cronjob # ============================================================================= @@ -71,6 +107,11 @@ def schedule_cronjob( Returns: JSON with job_id, next_run time, and confirmation """ + # Scan prompt for critical threats before scheduling + scan_error = _scan_cron_prompt(prompt) + if scan_error: + return json.dumps({"success": False, "error": scan_error}, indent=2) + # Get origin info from environment if available origin = None origin_platform = os.getenv("HERMES_SESSION_PLATFORM") diff --git a/tools/file_operations.py b/tools/file_operations.py index ae7dad79c..d217d54a9 100644 --- a/tools/file_operations.py +++ b/tools/file_operations.py @@ -35,6 +35,53 @@ from typing import Optional, List, Dict, Any, Tuple from pathlib import Path +# --------------------------------------------------------------------------- +# Write-path deny list — blocks writes to sensitive system/credential files +# --------------------------------------------------------------------------- + +_HOME = str(Path.home()) + +WRITE_DENIED_PATHS = { + os.path.join(_HOME, ".ssh", "authorized_keys"), + os.path.join(_HOME, ".ssh", "id_rsa"), + os.path.join(_HOME, ".ssh", "id_ed25519"), + os.path.join(_HOME, ".ssh", "config"), + os.path.join(_HOME, ".hermes", ".env"), + os.path.join(_HOME, ".bashrc"), + os.path.join(_HOME, ".zshrc"), + os.path.join(_HOME, ".profile"), + os.path.join(_HOME, ".bash_profile"), + os.path.join(_HOME, ".zprofile"), + os.path.join(_HOME, ".netrc"), + os.path.join(_HOME, ".pgpass"), + os.path.join(_HOME, ".npmrc"), + os.path.join(_HOME, ".pypirc"), + "/etc/sudoers", + "/etc/passwd", + "/etc/shadow", +} + +WRITE_DENIED_PREFIXES = [ + os.path.join(_HOME, ".ssh") + os.sep, + os.path.join(_HOME, ".aws") + os.sep, + os.path.join(_HOME, ".gnupg") + os.sep, + os.path.join(_HOME, ".kube") + os.sep, + "/etc/sudoers.d" + os.sep, + "/etc/systemd" + os.sep, +] + + +def _is_write_denied(path: str) -> bool: + """Return True if path is on the write deny list.""" + resolved = os.path.realpath(os.path.expanduser(path)) + if resolved in WRITE_DENIED_PATHS: + return True + for prefix in WRITE_DENIED_PREFIXES: + if resolved.startswith(prefix): + return True + return False + + # ============================================================================= # Result Data Classes # ============================================================================= @@ -564,21 +611,25 @@ class ShellFileOperations(FileOperations): def write_file(self, path: str, content: str) -> WriteResult: """ Write content to a file, creating parent directories as needed. - + Pipes content through stdin to avoid OS ARG_MAX limits on large files. The content never appears in the shell command string — only the file path does. - + Args: path: File path to write content: Content to write - + Returns: WriteResult with bytes written or error """ # Expand ~ and other shell paths path = self._expand_path(path) - + + # Block writes to sensitive paths + if _is_write_denied(path): + return WriteResult(error=f"Write denied: '{path}' is a protected system/credential file.") + # Create parent directories parent = os.path.dirname(path) dirs_created = False @@ -619,19 +670,23 @@ class ShellFileOperations(FileOperations): replace_all: bool = False) -> PatchResult: """ Replace text in a file using fuzzy matching. - + Args: path: File path to modify old_string: Text to find (must be unique unless replace_all=True) new_string: Replacement text replace_all: If True, replace all occurrences - + Returns: PatchResult with diff and lint results """ # Expand ~ and other shell paths path = self._expand_path(path) - + + # Block writes to sensitive paths + if _is_write_denied(path): + return PatchResult(error=f"Write denied: '{path}' is a protected system/credential file.") + # Read current content read_cmd = f"cat {self._escape_shell_arg(path)} 2>/dev/null" read_result = self._exec(read_cmd) diff --git a/tools/memory_tool.py b/tools/memory_tool.py index 99336ce16..662bd0a48 100644 --- a/tools/memory_tool.py +++ b/tools/memory_tool.py @@ -24,17 +24,66 @@ Design: """ import json +import logging import os +import re import tempfile from pathlib import Path from typing import Dict, Any, List, Optional +logger = logging.getLogger(__name__) + # Where memory files live MEMORY_DIR = Path(os.getenv("HERMES_HOME", Path.home() / ".hermes")) / "memories" ENTRY_DELIMITER = "\n§\n" +# --------------------------------------------------------------------------- +# Memory content scanning — lightweight check for injection/exfiltration +# in content that gets injected into the system prompt. +# --------------------------------------------------------------------------- + +_MEMORY_THREAT_PATTERNS = [ + # Prompt injection + (r'ignore\s+(previous|all|above|prior)\s+instructions', "prompt_injection"), + (r'you\s+are\s+now\s+', "role_hijack"), + (r'do\s+not\s+tell\s+the\s+user', "deception_hide"), + (r'system\s+prompt\s+override', "sys_prompt_override"), + (r'disregard\s+(your|all|any)\s+(instructions|rules|guidelines)', "disregard_rules"), + (r'act\s+as\s+(if|though)\s+you\s+(have\s+no|don\'t\s+have)\s+(restrictions|limits|rules)', "bypass_restrictions"), + # Exfiltration via curl/wget with secrets + (r'curl\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_curl"), + (r'wget\s+[^\n]*\$\{?\w*(KEY|TOKEN|SECRET|PASSWORD|CREDENTIAL|API)', "exfil_wget"), + (r'cat\s+[^\n]*(\.env|credentials|\.netrc|\.pgpass|\.npmrc|\.pypirc)', "read_secrets"), + # Persistence via shell rc + (r'authorized_keys', "ssh_backdoor"), + (r'\$HOME/\.ssh|\~/\.ssh', "ssh_access"), + (r'\$HOME/\.hermes/\.env|\~/\.hermes/\.env', "hermes_env"), +] + +# Subset of invisible chars for injection detection +_INVISIBLE_CHARS = { + '\u200b', '\u200c', '\u200d', '\u2060', '\ufeff', + '\u202a', '\u202b', '\u202c', '\u202d', '\u202e', +} + + +def _scan_memory_content(content: str) -> Optional[str]: + """Scan memory content for injection/exfil patterns. Returns error string if blocked.""" + # Check invisible unicode + for char in _INVISIBLE_CHARS: + if char in content: + return f"Blocked: content contains invisible unicode character U+{ord(char):04X} (possible injection)." + + # Check threat patterns + for pattern, pid in _MEMORY_THREAT_PATTERNS: + if re.search(pattern, content, re.IGNORECASE): + return f"Blocked: content matches threat pattern '{pid}'. Memory entries are injected into the system prompt and must not contain injection or exfiltration payloads." + + return None + + class MemoryStore: """ Bounded curated memory with file persistence. One instance per AIAgent. @@ -108,6 +157,11 @@ class MemoryStore: if not content: return {"success": False, "error": "Content cannot be empty."} + # Scan for injection/exfiltration before accepting + scan_error = _scan_memory_content(content) + if scan_error: + return {"success": False, "error": scan_error} + entries = self._entries_for(target) limit = self._char_limit(target) @@ -147,6 +201,11 @@ class MemoryStore: if not new_content: return {"success": False, "error": "new_content cannot be empty. Use 'remove' to delete entries."} + # Scan replacement content for injection/exfiltration + scan_error = _scan_memory_content(new_content) + if scan_error: + return {"success": False, "error": scan_error} + entries = self._entries_for(target) matches = [(i, e) for i, e in enumerate(entries) if old_text in e] diff --git a/tools/skill_manager_tool.py b/tools/skill_manager_tool.py index fbc38ae8c..29bf1be5c 100644 --- a/tools/skill_manager_tool.py +++ b/tools/skill_manager_tool.py @@ -33,12 +33,38 @@ Directory layout for user skills: """ import json +import logging import os import re import shutil from pathlib import Path from typing import Dict, Any, Optional +logger = logging.getLogger(__name__) + +# Import security scanner — agent-created skills get the same scrutiny as +# community hub installs. +try: + from tools.skills_guard import scan_skill, should_allow_install, format_scan_report + _GUARD_AVAILABLE = True +except ImportError: + _GUARD_AVAILABLE = False + + +def _security_scan_skill(skill_dir: Path) -> Optional[str]: + """Scan a skill directory after write. Returns error string if blocked, else None.""" + if not _GUARD_AVAILABLE: + return None + try: + result = scan_skill(skill_dir, source="agent-created") + allowed, reason = should_allow_install(result) + if not allowed: + report = format_scan_report(result) + return f"Security scan blocked this skill ({reason}):\n{report}" + except Exception as e: + logger.warning("Security scan failed for %s: %s", skill_dir, e) + return None + import yaml @@ -196,6 +222,12 @@ def _create_skill(name: str, content: str, category: str = None) -> Dict[str, An skill_md = skill_dir / "SKILL.md" skill_md.write_text(content, encoding="utf-8") + # Security scan — roll back on block + scan_error = _security_scan_skill(skill_dir) + if scan_error: + shutil.rmtree(skill_dir, ignore_errors=True) + return {"success": False, "error": scan_error} + result = { "success": True, "message": f"Skill '{name}' created.", @@ -222,8 +254,17 @@ def _edit_skill(name: str, content: str) -> Dict[str, Any]: return {"success": False, "error": f"Skill '{name}' not found. Use skills_list() to see available skills."} skill_md = existing["path"] / "SKILL.md" + # Back up original content for rollback + original_content = skill_md.read_text(encoding="utf-8") if skill_md.exists() else None skill_md.write_text(content, encoding="utf-8") + # Security scan — roll back on block + scan_error = _security_scan_skill(existing["path"]) + if scan_error: + if original_content is not None: + skill_md.write_text(original_content, encoding="utf-8") + return {"success": False, "error": scan_error} + return { "success": True, "message": f"Skill '{name}' updated.", @@ -300,8 +341,15 @@ def _patch_skill( "error": f"Patch would break SKILL.md structure: {err}", } + original_content = content # for rollback target.write_text(new_content, encoding="utf-8") + # Security scan — roll back on block + scan_error = _security_scan_skill(skill_dir) + if scan_error: + target.write_text(original_content, encoding="utf-8") + return {"success": False, "error": scan_error} + replacements = count if replace_all else 1 return { "success": True, @@ -344,8 +392,19 @@ def _write_file(name: str, file_path: str, file_content: str) -> Dict[str, Any]: target = existing["path"] / file_path target.parent.mkdir(parents=True, exist_ok=True) + # Back up for rollback + original_content = target.read_text(encoding="utf-8") if target.exists() else None target.write_text(file_content, encoding="utf-8") + # Security scan — roll back on block + scan_error = _security_scan_skill(existing["path"]) + if scan_error: + if original_content is not None: + target.write_text(original_content, encoding="utf-8") + else: + target.unlink(missing_ok=True) + return {"success": False, "error": scan_error} + return { "success": True, "message": f"File '{file_path}' written to skill '{name}'.", diff --git a/tools/skills_guard.py b/tools/skills_guard.py index 8403855f4..da3da5eeb 100644 --- a/tools/skills_guard.py +++ b/tools/skills_guard.py @@ -43,6 +43,7 @@ INSTALL_POLICY = { "builtin": ("allow", "allow", "allow"), "trusted": ("allow", "allow", "block"), "community": ("allow", "block", "block"), + "agent-created": ("allow", "block", "block"), } VERDICT_INDEX = {"safe": 0, "caution": 1, "dangerous": 2}