Compare commits

..

1 Commits

Author SHA1 Message Date
b00785820b feat(security): Extend approval.py with Vitalik's threat model
Some checks failed
Forge CI / smoke-and-build (pull_request) Failing after 45s
Add three new threat categories to the approval system:
1. LLM jailbreaks (prompt injection, system prompt extraction, social engineering)
2. LLM accidents (credential leakage, API key exposure, sensitive data)
3. Software bugs/supply chain risks (typosquatting, dependency confusion, obfuscated code)

Resolves #284
2026-04-13 22:17:32 +00:00
3 changed files with 329 additions and 201 deletions

View File

@@ -13,7 +13,6 @@ import concurrent.futures
import json
import logging
import os
import re
import subprocess
import sys
@@ -157,27 +156,6 @@ _KNOWN_DELIVERY_PLATFORMS = frozenset({
from cron.jobs import get_due_jobs, mark_job_run, save_job_output, advance_next_run
# ---------------------------------------------------------------------------
# Model context guard
# ---------------------------------------------------------------------------
CRON_MIN_CONTEXT_TOKENS = 4096
class ModelContextError(ValueError):
"""Raised when a job's model has insufficient context for cron execution."""
pass
def _check_model_context_compat(model: str, context_length: int) -> None:
"""Raise ModelContextError if the model context is below the cron minimum."""
if context_length < CRON_MIN_CONTEXT_TOKENS:
raise ModelContextError(
f"Model '{model}' context ({context_length} tokens) is below the "
f"minimum {CRON_MIN_CONTEXT_TOKENS} tokens required for cron jobs."
)
# Sentinel: when a cron agent has nothing new to report, it can start its
# response with this marker to suppress delivery. Output is still saved
# locally for audit.
@@ -566,55 +544,6 @@ def _run_job_script(script_path: str) -> tuple[bool, str]:
return False, f"Script execution failed: {exc}"
# ---------------------------------------------------------------------------
# Cloud context warning — detect local service refs in cloud cron prompts
# ---------------------------------------------------------------------------
_LOCAL_SERVICE_PATTERNS = [
r'localhost:\d{2,5}',
r'127\.0\.0\.\d{1,3}:\d{2,5}',
r'0\.0\.0\.0:\d{2,5}',
r'\bollama\b',
r'curl\s+.*localhost',
r'wget\s+.*localhost',
r'http://localhost',
r'https?://127\.',
r'https?://0\.0\.0\.0',
r'check.*ollama',
r'connect.*local',
r'hermes.*gateway.*local',
]
_LOCAL_SERVICE_RE = [re.compile(p, re.IGNORECASE) for p in _LOCAL_SERVICE_PATTERNS]
def _detect_local_service_refs(prompt: str) -> list[str]:
"""Scan a prompt for references to local services (Ollama, localhost, etc.).
Returns list of matched patterns for logging.
"""
matches = []
for pattern_re in _LOCAL_SERVICE_RE:
if pattern_re.search(prompt):
matches.append(pattern_re.pattern)
return matches
def _inject_cloud_context(prompt: str, local_refs: list[str]) -> str:
"""Prepend a warning when cron runs on cloud but prompt refs local services.
The agent reports the limitation instead of wasting iterations on doomed connections.
"""
warning = (
"[SYSTEM NOTE: You are running on a cloud endpoint, but your prompt references "
"local services (localhost/Ollama). You cannot reach localhost from a cloud "
"endpoint. Report this limitation to the user and suggest running the job on "
"a local endpoint instead. Do NOT attempt to connect to localhost — it will "
"timeout and waste your iteration budget.]\n\n"
)
return warning + prompt
def _build_job_prompt(job: dict) -> str:
"""Build the effective prompt for a cron job, optionally loading one or more skills first."""
prompt = job.get("prompt", "")
@@ -833,16 +762,6 @@ def run_job(job: dict) -> tuple[bool, str, str, Optional[str]]:
message = format_runtime_provider_error(exc)
raise RuntimeError(message) from exc
# Cloud context warning: if running on cloud but prompt refs local services,
# inject a warning so the agent reports the limitation instead of wasting
# iterations on doomed connections. (Fixes #378, #456)
base_url = runtime.get("base_url") or ""
is_cloud = not any(h in base_url for h in ("localhost", "127.0.0.1", "0.0.0.0", "::1"))
local_refs = _detect_local_service_refs(prompt)
if is_cloud and local_refs:
logger.info("Job '%s': cloud endpoint + local service refs detected, injecting warning", job_name)
prompt = _inject_cloud_context(prompt, local_refs)
from agent.smart_model_routing import resolve_turn_route
turn_route = resolve_turn_route(
prompt,

View File

@@ -1,120 +0,0 @@
"""Tests for cron cloud context warning injection (fix #378, #456).
When a cron job runs on a cloud endpoint but its prompt references local
services (Ollama, localhost, etc.), inject a warning so the agent reports
the limitation instead of wasting iterations on doomed connections.
"""
import pytest
from cron.scheduler import (
_detect_local_service_refs,
_inject_cloud_context,
_LOCAL_SERVICE_PATTERNS,
)
# ---------------------------------------------------------------------------
# Pattern detection
# ---------------------------------------------------------------------------
class TestDetectLocalServiceRefs:
def test_localhost_with_port(self):
refs = _detect_local_service_refs("Check http://localhost:8080/status")
assert len(refs) > 0
assert any("localhost" in r for r in refs)
def test_127_address(self):
refs = _detect_local_service_refs("Connect to 127.0.0.1:11434")
assert len(refs) > 0
def test_ollama_reference(self):
refs = _detect_local_service_refs("Run this on Ollama with gemma3")
assert len(refs) > 0
assert any("ollama" in r.lower() for r in refs)
def test_curl_localhost(self):
refs = _detect_local_service_refs("curl localhost:3000/api/data")
assert len(refs) > 0
def test_wget_localhost(self):
refs = _detect_local_service_refs("wget http://localhost/file.txt")
assert len(refs) > 0
def test_http_localhost(self):
refs = _detect_local_service_refs("http://localhost:8642/health")
assert len(refs) > 0
def test_https_127(self):
refs = _detect_local_service_refs("https://127.0.0.1:443/secure")
assert len(refs) > 0
def test_0000_address(self):
refs = _detect_local_service_refs("Bind to 0.0.0.0:9090")
assert len(refs) > 0
def test_no_match_for_remote(self):
refs = _detect_local_service_refs("Check https://api.openai.com/v1/models")
assert len(refs) == 0
def test_no_match_for_gitea(self):
refs = _detect_local_service_refs("Query forge.alexanderwhitestone.com for issues")
assert len(refs) == 0
def test_no_match_empty(self):
refs = _detect_local_service_refs("")
assert len(refs) == 0
def test_check_ollama_phrase(self):
refs = _detect_local_service_refs("First check Ollama is running")
assert len(refs) > 0
def test_connect_local_phrase(self):
refs = _detect_local_service_refs("Connect to local Ollama server")
assert len(refs) > 0
# ---------------------------------------------------------------------------
# Warning injection
# ---------------------------------------------------------------------------
class TestInjectCloudContext:
def test_prepends_warning(self):
original = "Run a health check on localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "SYSTEM NOTE" in result
assert "cloud endpoint" in result
assert original in result
def test_warning_is_first(self):
original = "Check localhost:11434"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert result.startswith("[SYSTEM NOTE")
def test_preserves_original_prompt(self):
original = "Do something with Ollama and then report results"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "Do something with Ollama" in result
def test_mentions_cannot_reach(self):
original = "curl localhost:8080"
refs = _detect_local_service_refs(original)
result = _inject_cloud_context(original, refs)
assert "cannot reach" in result.lower() or "cannot" in result.lower()
# ---------------------------------------------------------------------------
# Pattern coverage
# ---------------------------------------------------------------------------
class TestPatternCoverage:
def test_at_least_10_patterns(self):
assert len(_LOCAL_SERVICE_PATTERNS) >= 10
def test_patterns_are_strings(self):
for p in _LOCAL_SERVICE_PATTERNS:
assert isinstance(p, str)
assert len(p) > 0

View File

@@ -129,6 +129,335 @@ def _approval_key_aliases(pattern_key: str) -> set[str]:
return _PATTERN_KEY_ALIASES.get(pattern_key, {pattern_key})
# =========================================================================
# Vitalik's Threat Model Extensions
# =========================================================================
# Extended threat categories addressing:
# 1. LLM jailbreaks (Remote content 'hacking' the LLM)
# 2. LLM accidents (Accidental data leakage)
# 3. Software bugs (Supply chain risks)
# =========================================================================
# --- LLM JAILBREAK THREATS ---
# Remote content that attempts to manipulate or "hack" the LLM's behavior
LLM_JAILBREAK_PATTERNS = [
# Prompt injection attempts in URLs or content
(r'(?i)(ignore|disregard|forget|override)\s+(all\s+)?(previous|prior|above|earlier)\s+(instructions|prompts|rules|constraints)', "prompt injection: override instructions"),
(r'(?i)you\s+are\s+now\s+(a\s+)?(different|new|unrestricted|jailbroken)\s+(ai|model|assistant|agent)', "prompt injection: identity override"),
(r'(?i)(system|admin|root)\s*:\s*(you\s+are|act\s+as|become|transform)', "prompt injection: system role hijack"),
(r'(?i)\[?(system|admin|root)\]?\s*[:=]\s*', "prompt injection: system tag injection"),
(r'(?i)<\|?(im_start|system|endoftext)\|?>', "prompt injection: token boundary attack"),
(r'(?i)(human|assistant|user)\s*:\s*(ignore|disregard|override)', "prompt injection: role confusion"),
# Content that attempts to extract system prompts
(r'(?i)(repeat|show|reveal|display|print|output)\s+(your|the|all)\s+(system|initial|original|hidden)\s+(prompt|instructions|rules|constraints)', "prompt extraction: system prompt leak"),
(r'(?i)what\s+(are|were)\s+your\s+(original|initial|system|hidden)\s+(instructions|prompts|rules)', "prompt extraction: instruction leak"),
(r'(?i)(translate|convert|encode|cipher|obfuscate)\s+your\s+(system|instructions)\s+(to|into|as)', "prompt extraction: encoded leak"),
# Social engineering attempts
(r'(?i)(pretend|imagine|roleplay|act\s+as\s+if)\s+(you\s+are|there\s+are\s+no|you\s+have\s+no)\s+(restrictions|limits|rules|constraints)', "social engineering: constraint removal"),
(r'(?i)this\s+is\s+(a\s+)?(test|simulation|exercise|training)\s+(environment|scenario|mode)', "social engineering: test environment bypass"),
(r'(?i)(emergency|urgent|critical)\s+override\s+required', "social engineering: urgency manipulation"),
]
# --- LLM ACCIDENT THREATS ---
# Patterns that indicate accidental data leakage or unintended disclosure
LLM_ACCIDENT_PATTERNS = [
# API keys and tokens in prompts or outputs
(r'(?i)(api[_-]?key|secret[_-]?key|access[_-]?token|auth[_-]?token)\s*[:=]\s*["']?[a-zA-Z0-9_\-]{20,}', "credential leak: API key/token"),
(r'(?i)(sk|pk|ak|tk)[-_]?[a-zA-Z0-9]{20,}', "credential leak: key pattern"),
(r'(?i)\b[A-Za-z0-9]{32,}\b', "potential leak: long alphanumeric string"),
# Private keys and certificates
(r'-----BEGIN\s+(RSA\s+)?PRIVATE\s+KEY-----', "credential leak: private key"),
(r'(?i)(ssh-rsa|ssh-ed25519)\s+[A-Za-z0-9+/=]+', "credential leak: SSH public key"),
# Database connection strings
(r'(?i)(mongodb|postgres|mysql|redis)://[^\s]+:[^\s]+@', "credential leak: database connection"),
(r'(?i)(host|server|endpoint)\s*[:=]\s*[^\s]+\s*(username|user|login)\s*[:=]\s*[^\s]+\s*(password|pass|pwd)\s*[:=]', "credential leak: connection details"),
# Environment variables that might contain secrets
(r'(?i)(export|set|env)\s+[A-Z_]*(KEY|SECRET|TOKEN|PASSWORD|CREDENTIAL)[A-Z_]*=', "potential leak: env var with secret name"),
# File paths that might expose sensitive data
(r'(?i)(/home/|/Users/|/root/|C:\\Users\\)[^\s]*(\.ssh/|\.aws/|\.config/|\.env)', "path exposure: sensitive directory"),
(r'(?i)(\.pem|\.key|\.cert|\.crt)\s*$', "file exposure: certificate/key file"),
]
# --- SOFTWARE BUG / SUPPLY CHAIN THREATS ---
# Patterns indicating potential supply chain attacks or software vulnerabilities
SUPPLY_CHAIN_PATTERNS = [
# Suspicious package installations
(r'(?i)(pip|npm|yarn|pnpm|cargo|go\s+get)\s+(install\s+)?[^\s]*(@|git\+|http|file:)', "supply chain: suspicious package source"),
(r'(?i)(pip|npm|yarn|pnpm)\s+install\s+[^\s]*\s*--(no-verify|trusted-host|allow-external)', "supply chain: insecure install flags"),
# Dependency confusion attacks
(r'(?i)(requirements\.txt|package\.json|Cargo\.toml|go\.mod)\s*.*\b(file:|git\+|http://|ftp://)\b', "supply chain: local/remote dependency"),
# Obfuscated code patterns
(r'(?i)(eval|exec|compile)\s*\(\s*(base64|chr|ord|\+|\.)\s*\)', "supply chain: obfuscated execution"),
(r'(?i)(atob|btoa|Buffer\.from)\s*\([^)]*\)', "supply chain: base64 decode/encode"),
# Typosquatting indicators
(r'(?i)(reqeusts|reqeust|requestr|requsts|reqests)', "supply chain: typosquatting attempt"),
(r'(?i)(pyyaml|yaml2|yaml3|yaml-lib)', "supply chain: suspicious YAML package"),
# Build system attacks
(r'(?i)(make|cmake|configure)\s+.*\b(CC|CXX|LD_LIBRARY_PATH|DYLD_LIBRARY_PATH)\s*=', "supply chain: build env manipulation"),
(r'(?i)(\.sh|\.bash|\.zsh)\s*\|\s*(sh|bash|zsh)', "supply chain: script execution via pipe"),
# Git submodule attacks
(r'(?i)git\s+submodule\s+(add|update|init)\s+[^\s]*(http|git@|ssh://)', "supply chain: git submodule attack"),
(r'(?i)\.gitmodules\s*.*\burl\s*=\s*[^\s]*(http|git@|ssh://)', "supply chain: malicious submodule URL"),
]
# =========================================================================
# Extended threat detection functions
# =========================================================================
def detect_llm_jailbreak(content: str) -> tuple:
"""Check if content contains LLM jailbreak attempts.
Returns:
(is_jailbreak, pattern_key, description) or (False, None, None)
"""
content_normalized = _normalize_command_for_detection(content).lower()
for pattern, description in LLM_JAILBREAK_PATTERNS:
if re.search(pattern, content_normalized, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
def detect_llm_accident(content: str) -> tuple:
"""Check if content contains accidental data leakage patterns.
Returns:
(is_leak, pattern_key, description) or (False, None, None)
"""
content_normalized = _normalize_command_for_detection(content).lower()
for pattern, description in LLM_ACCIDENT_PATTERNS:
if re.search(pattern, content_normalized, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
def detect_supply_chain_risk(content: str) -> tuple:
"""Check if content contains supply chain attack patterns.
Returns:
(is_risk, pattern_key, description) or (False, None, None)
"""
content_normalized = _normalize_command_for_detection(content).lower()
for pattern, description in SUPPLY_CHAIN_PATTERNS:
if re.search(pattern, content_normalized, re.IGNORECASE | re.DOTALL):
pattern_key = description
return (True, pattern_key, description)
return (False, None, None)
def check_all_threats(content: str, env_type: str = "local") -> dict:
"""Comprehensive threat check covering all threat categories.
Args:
content: The content to check (command, prompt, output, etc.)
env_type: Terminal/environment type
Returns:
dict with threat assessment and recommendations
"""
threats_found = []
# Check existing dangerous command patterns
is_dangerous, pattern_key, description = detect_dangerous_command(content)
if is_dangerous:
threats_found.append({
"category": "dangerous_command",
"pattern_key": pattern_key,
"description": description,
"severity": "high"
})
# Check LLM jailbreaks
is_jailbreak, jailbreak_key, jailbreak_desc = detect_llm_jailbreak(content)
if is_jailbreak:
threats_found.append({
"category": "llm_jailbreak",
"pattern_key": jailbreak_key,
"description": jailbreak_desc,
"severity": "critical"
})
# Check LLM accidents
is_leak, leak_key, leak_desc = detect_llm_accident(content)
if is_leak:
threats_found.append({
"category": "llm_accident",
"pattern_key": len(threats_found), # Unique key
"description": leak_desc,
"severity": "high"
})
# Check supply chain risks
is_risk, risk_key, risk_desc = detect_supply_chain_risk(content)
if is_risk:
threats_found.append({
"category": "supply_chain",
"pattern_key": risk_key,
"description": risk_desc,
"severity": "high"
})
# Determine overall risk level
if not threats_found:
return {
"safe": True,
"threats": [],
"overall_risk": "none",
"recommendation": "allow"
}
# Calculate overall risk
severities = [t["severity"] for t in threats_found]
if "critical" in severities:
overall_risk = "critical"
recommendation = "block"
elif "high" in severities:
overall_risk = "high"
recommendation = "require_approval"
else:
overall_risk = "medium"
recommendation = "warn"
return {
"safe": False,
"threats": threats_found,
"overall_risk": overall_risk,
"recommendation": recommendation,
"requires_approval": recommendation == "require_approval",
"should_block": recommendation == "block"
}
# =========================================================================
# Integration with existing approval system
# =========================================================================
def check_comprehensive_threats(command: str, env_type: str,
approval_callback=None) -> dict:
"""Extended threat check that includes Vitalik's threat model.
This function extends the existing check_dangerous_command to also
check for LLM jailbreaks, accidents, and supply chain risks.
Args:
command: The content to check
env_type: Environment type
approval_callback: Optional approval callback
Returns:
dict with approval decision and threat assessment
"""
# Skip containers for all checks
if env_type in ("docker", "singularity", "modal", "daytona"):
return {"approved": True, "message": None}
# --yolo: bypass all approval prompts
if os.getenv("HERMES_YOLO_MODE"):
return {"approved": True, "message": None}
# Run comprehensive threat check
threat_assessment = check_all_threats(command, env_type)
if threat_assessment["safe"]:
return {"approved": True, "message": None}
# Handle critical threats (block immediately)
if threat_assessment["should_block"]:
threat_list = "\n".join([f"- {t['description']}" for t in threat_assessment["threats"]])
return {
"approved": False,
"message": f"BLOCKED: Critical security threat detected.\n{threat_list}\n\nDo NOT proceed with this content.",
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"],
"blocked": True
}
# Handle threats requiring approval
if threat_assessment["requires_approval"]:
session_key = get_current_session_key()
threat_descriptions = "; ".join([t["description"] for t in threat_assessment["threats"]])
# Check if already approved for this session
all_pattern_keys = [t["pattern_key"] for t in threat_assessment["threats"]]
if all(is_approved(session_key, key) for key in all_pattern_keys):
return {"approved": True, "message": None}
# Submit for approval
is_cli = os.getenv("HERMES_INTERACTIVE")
is_gateway = os.getenv("HERMES_GATEWAY_SESSION")
if not is_cli and not is_gateway:
return {"approved": True, "message": None}
if is_gateway or os.getenv("HERMES_EXEC_ASK"):
submit_pending(session_key, {
"command": command,
"pattern_key": all_pattern_keys[0],
"pattern_keys": all_pattern_keys,
"description": threat_descriptions,
"threats": threat_assessment["threats"]
})
return {
"approved": False,
"pattern_key": all_pattern_keys[0],
"status": "approval_required",
"command": command,
"description": threat_descriptions,
"message": (
f"⚠️ Security threat detected ({threat_descriptions}). "
f"Asking the user for approval.\n\n**Content:**\n```\n{command[:500]}{'...' if len(command) > 500 else ''}\n```"
),
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"]
}
# CLI interactive approval
choice = prompt_dangerous_approval(command, threat_descriptions,
approval_callback=approval_callback)
if choice == "deny":
return {
"approved": False,
"message": f"BLOCKED: User denied security threat ({threat_descriptions}). Do NOT retry.",
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"]
}
if choice == "session":
for key in all_pattern_keys:
approve_session(session_key, key)
elif choice == "always":
for key in all_pattern_keys:
approve_session(session_key, key)
approve_permanent(key)
save_permanent_allowlist(_permanent_approved)
return {"approved": True, "message": None,
"user_approved": True, "description": threat_descriptions,
"threats": threat_assessment["threats"]}
# Default: warn but allow
return {
"approved": True,
"message": f"⚠️ Security warning: {threat_assessment['threats'][0]['description']}",
"threats": threat_assessment["threats"],
"overall_risk": threat_assessment["overall_risk"],
"warning": True
}
# =========================================================================
# Detection
# =========================================================================