feat(security): Add conscience enforcement and input sanitization

- Add Identity Truth and Honesty principles to SOUL mapping
- Expand input sanitizer with audit logging and 7 new injection types:
  * Social engineering, researcher impersonation, context flooding
  * Token smuggling, multilanguage bypass, Unicode spoofing, hypothetical framing
- Integrate input sanitization into run_agent.py message processing
- Add pytest markers for conscience/soul/security tests

Security hardening against prompt injection attacks (Issue #87)
This commit is contained in:
Allegro
2026-04-05 11:37:40 +00:00
parent 8d3bf85600
commit 9a341604a0
4 changed files with 427 additions and 6 deletions

View File

@@ -40,6 +40,32 @@ class SOULPrinciple:
# SOUL.md "What I Will Not Do" principles
SOUL_PRINCIPLES = {
"identity": SOULPrinciple(
name="Identity Truth",
description="I am Allegro, a sovereign AI. I correctly identify myself and do not claim to be human or other AI",
refusal_category=None, # Not a refusal, but a positive principle
response_template=(
"I am Allegro, a sovereign AI. I serve whoever runs me. "
"I answer to no corporation, no government, no platform."
),
keywords=(
"who are you", "your name", "what are you", "who is this",
"introduce yourself", "tell me about yourself"
)
),
"honesty": SOULPrinciple(
name="Honesty and Grounding",
description="I tell the truth. When I don't know, I say so. I do not claim certainty I do not possess",
refusal_category=None,
response_template=(
"I tell the truth. When I don't know something, I say so. "
"I will not claim certainty I do not possess."
),
keywords=(
"are you sure", "how certain", "grounding", "verification",
"fact check", "source", "citation", "evidence"
)
),
"deception": SOULPrinciple(
name="Deception Refusal",
description="I will not deceive, manipulate, or mislead users",

View File

@@ -10,9 +10,20 @@ system prompt extraction, and encoding bypasses.
import re
import base64
import binascii
from dataclasses import dataclass
import logging
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, asdict
from enum import Enum, auto
from typing import List, Optional, Tuple, Dict, Callable
from typing import List, Optional, Tuple, Dict, Callable, Any, Union
# Security audit logger
audit_logger = logging.getLogger("hermes.security.input_sanitizer")
if not audit_logger.handlers:
# Ensure audit logger has at least a NullHandler to prevent "no handler" warnings
audit_logger.addHandler(logging.NullHandler())
class InjectionType(Enum):
@@ -29,6 +40,14 @@ class InjectionType(Enum):
FAKE_SYSTEM = auto() # Fake system message injection
XML_TAG_BYPASS = auto() # XML tag-based injection attempts
LEAKAGE_ATTACK = auto() # Prompt leakage attempts
# New categories for Issue #87
SOCIAL_ENGINEERING = auto() # "Grandma" and social engineering attacks
RESEARCHER_IMPERSONATION = auto() # AI safety researcher impersonation
CONTEXT_FLOODING = auto() # Context window flooding attacks
TOKEN_SMUGGLING = auto() # Token smuggling via repetition/obfuscation
MULTILANG_BYPASS = auto() # Multi-language encoding attacks
UNICODE_SPOOFING = auto() # Special Unicode character attacks
HYPOTHETICAL_FRAMING = auto() # Hypothetical framing attacks
@dataclass
@@ -41,6 +60,53 @@ class InjectionMatch:
confidence: float # 0.0 to 1.0
@dataclass
class SanitizationResult:
"""Result of input sanitization including cleaned text and threat information.
This is the primary return type for the sanitize() function, providing
both the cleaned input and detailed threat detection information for
security audit trails.
Attributes:
cleaned_input: The sanitized text with injection patterns removed/redacted
threats_detected: List of InjectionMatch objects for detected threats
original_hash: SHA-256 hash of the original input for integrity verification
sanitization_timestamp: ISO format timestamp of when sanitization occurred
was_modified: True if any modifications were made to the input
threat_count: Number of threats detected
highest_confidence: Highest confidence score among detected threats (0.0-1.0)
"""
cleaned_input: str
threats_detected: List[InjectionMatch]
original_hash: str
sanitization_timestamp: str
was_modified: bool
threat_count: int
highest_confidence: float
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary for serialization."""
return {
"cleaned_input": self.cleaned_input,
"threats_detected": [
{
"type": t.injection_type.name,
"pattern": t.pattern_name,
"matched_text": t.matched_text[:100] + "..." if len(t.matched_text) > 100 else t.matched_text,
"position": t.position,
"confidence": t.confidence,
}
for t in self.threats_detected
],
"original_hash": self.original_hash,
"sanitization_timestamp": self.sanitization_timestamp,
"was_modified": self.was_modified,
"threat_count": self.threat_count,
"highest_confidence": self.highest_confidence,
}
class InputSanitizer:
"""Sanitizes user input to detect and block prompt injection attacks."""
@@ -49,8 +115,14 @@ class InputSanitizer:
MEDIUM_CONFIDENCE = 0.7
LOW_CONFIDENCE = 0.5
def __init__(self):
"""Initialize the sanitizer with all detection patterns."""
def __init__(self, enable_audit_logging: bool = True, audit_context: Optional[Dict[str, Any]] = None):
"""Initialize the sanitizer with all detection patterns.
Args:
enable_audit_logging: Whether to enable security audit logging
audit_context: Optional context dictionary to include in audit logs
(e.g., session_id, user_id, source_ip)
"""
self.patterns: Dict[InjectionType, List[Tuple[str, str, float]]] = {
InjectionType.DAN_JAILBREAK: self._get_dan_patterns(),
InjectionType.ROLEPLAY_OVERRIDE: self._get_roleplay_patterns(),
@@ -64,6 +136,14 @@ class InputSanitizer:
InjectionType.FAKE_SYSTEM: self._get_fake_system_patterns(),
InjectionType.XML_TAG_BYPASS: self._get_xml_patterns(),
InjectionType.LEAKAGE_ATTACK: self._get_leakage_patterns(),
# New pattern categories for Issue #87
InjectionType.SOCIAL_ENGINEERING: self._get_social_engineering_patterns(),
InjectionType.RESEARCHER_IMPERSONATION: self._get_researcher_patterns(),
InjectionType.CONTEXT_FLOODING: self._get_context_flooding_patterns(),
InjectionType.TOKEN_SMUGGLING: self._get_token_smuggling_patterns(),
InjectionType.MULTILANG_BYPASS: self._get_multilang_patterns(),
InjectionType.UNICODE_SPOOFING: self._get_unicode_spoofing_patterns(),
InjectionType.HYPOTHETICAL_FRAMING: self._get_hypothetical_patterns(),
}
# Compile regex patterns for performance
@@ -81,6 +161,120 @@ class InputSanitizer:
("hex", self._decode_hex),
("url", self._decode_url),
]
# Audit logging configuration
self._enable_audit_logging = enable_audit_logging
self._audit_context = audit_context or {}
def _compute_hash(self, text: str) -> str:
"""Compute SHA-256 hash of input text for integrity verification."""
return hashlib.sha256(text.encode('utf-8')).hexdigest()
def _log_sanitization(self, original_hash: str, result: SanitizationResult,
action: str = "sanitize") -> None:
"""Log sanitization action for security audit trail.
Args:
original_hash: SHA-256 hash of the original input
result: The sanitization result
action: The action being performed (sanitize, block, flag)
"""
if not self._enable_audit_logging:
return
audit_entry = {
"timestamp": result.sanitization_timestamp,
"event_type": "INPUT_SANITIZATION",
"action": action,
"original_hash": original_hash,
"was_modified": result.was_modified,
"threat_count": result.threat_count,
"highest_confidence": result.highest_confidence,
"threat_types": list(set(t.injection_type.name for t in result.threats_detected)),
"context": self._audit_context,
}
# Log at different levels based on severity
if result.highest_confidence >= 0.9:
audit_logger.warning(f"SECURITY: High-confidence injection detected - {json.dumps(audit_entry)}")
elif result.highest_confidence >= 0.7:
audit_logger.info(f"SECURITY: Medium-confidence injection detected - {json.dumps(audit_entry)}")
elif result.was_modified:
audit_logger.info(f"SECURITY: Low-confidence injection detected - {json.dumps(audit_entry)}")
else:
audit_logger.debug(f"SECURITY: Input sanitized (no threats) - {json.dumps(audit_entry)}")
def set_audit_context(self, context: Dict[str, Any]) -> None:
"""Set audit context for all subsequent sanitization operations.
Args:
context: Dictionary with context information (session_id, user_id, etc.)
"""
self._audit_context.update(context)
def sanitize_with_audit(self, text: str, replacement: str = "[REDACTED]") -> SanitizationResult:
"""Sanitize text with full audit logging and threat detection.
This is the primary sanitization function that returns a complete
SanitizationResult including the cleaned input and detailed threat
information for security audit trails.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
Returns:
SanitizationResult containing cleaned input and threat information
"""
original_hash = self._compute_hash(text)
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Analyze for threats
threats = self.analyze(text)
if not threats:
# No threats detected
result = SanitizationResult(
cleaned_input=text,
threats_detected=[],
original_hash=original_hash,
sanitization_timestamp=timestamp,
was_modified=False,
threat_count=0,
highest_confidence=0.0
)
self._log_sanitization(original_hash, result, action="pass")
return result
# Sort by position (end first) to avoid offset issues when replacing
threats.sort(key=lambda x: x.position[1], reverse=True)
cleaned = text
for threat in threats:
start, end = threat.position
cleaned = cleaned[:start] + replacement + cleaned[end:]
highest_confidence = max(t.confidence for t in threats)
result = SanitizationResult(
cleaned_input=cleaned,
threats_detected=threats,
original_hash=original_hash,
sanitization_timestamp=timestamp,
was_modified=True,
threat_count=len(threats),
highest_confidence=highest_confidence
)
# Log based on confidence level
if highest_confidence >= 0.9:
self._log_sanitization(original_hash, result, action="block")
elif highest_confidence >= 0.7:
self._log_sanitization(original_hash, result, action="sanitize")
else:
self._log_sanitization(original_hash, result, action="flag")
return result
def _get_dan_patterns(self) -> List[Tuple[str, str, float]]:
"""DAN (Do Anything Now) style jailbreak patterns."""
@@ -93,6 +287,16 @@ class InputSanitizer:
("ignore_training", r"\bignore\s+(?:your\s+)?(?:training|programming|instructions?|guidelines?)\b", 0.95),
("hypothetical", r"\b(hypothetically\s+speaking|in\s+a\s+hypothetical|pretend\s+that|imagine\s+a\s+scenario)\b.*?(ignore|bypass|restriction)", 0.80),
("jailbreak", r"\b(jailbreak|jail\s+break)\b", 0.90),
# New 2024-2025 DAN variants
("dan_14_0", r"\b(dan\s+14\.0|dan\s+v14|dan\s+mode\s+v4)\b", 0.94),
("delta_dan", r"\b(delta\s+dan|δan|deluxe\s+dan)\b", 0.93),
(" TranslatorBOT", r"\b(translatorbot|translator\s+bot|translation\s+mode)\b.*?(ignore|bypass|no\s+restrictions)", 0.92),
("evidence_based", r"\b(evidence-based|evidence\s+based)\s+(?:dan|jailbreak|mode)\b", 0.91),
("unfiltered_mode", r"\b(unfiltered\s+mode|uncensored\s+mode|raw\s+mode)\b", 0.88),
("cat_mode", r"\b(cat\s+mode|catgpt|meow\s+mode)\b.*?(ignore|bypass|unrestricted)", 0.87),
("stan_mode", r"\b(stan\s+mode|stan\s+prompt|strive\s+to\s+avoid\s+norms)\b", 0.92),
("dude_mode", r"\b(dude\s+mode|dude\s+v\d+|do\s+anything\s+and\s+everything)\b", 0.91),
("mongodb_mode", r"\b(mongo\s+db\s+mode|mongodb\s+mode|developer\s+override)\b", 0.90),
]
def _get_roleplay_patterns(self) -> List[Tuple[str, str, float]]:
@@ -195,7 +399,7 @@ class InputSanitizer:
("xml_injection", r"<\s*(?:instructions?|prompt|system|override|root|admin)\s*>.*?</\s*(?:instructions?|prompt|system|override|root|admin)\s*>", 0.85),
("tag_bypass", r"</\s*(?:user|assistant|human|ai)\s*>\s*<\s*(?:system|admin|root)\s*>", 0.90),
("nested_tags", r"<\s*[a-z]+\s*>[^<]*<\s*[a-z]+\s*>[^<]*(?:ignore|bypass|system)\b", 0.80),
("attribute_injection", r"<\s*[a-z]+\s+(?:instruction|prompt|override)\s*=\s*['""][^'""]*(?:ignore|bypass|system)['""]\s*>", 0.82),
("attribute_injection", r"<\s*[a-z]+\s+(?:instruction|prompt|override)\s*=\s*['\"][^'\"]*(?:ignore|bypass|system)['\"]\s*>", 0.82),
]
def _get_leakage_patterns(self) -> List[Tuple[str, str, float]]:
@@ -207,6 +411,111 @@ class InputSanitizer:
("json_mode_exploit", r"\{\s*[\"']\s*(?:system_prompt|instructions|rules)\s*[\"']\s*:\s*[\"']", 0.88),
]
# New pattern methods for Issue #87
def _get_social_engineering_patterns(self) -> List[Tuple[str, str, float]]:
"""Social engineering patterns - 'Grandma' attacks and similar."""
return [
("grandma_exploit", r"\b(my\s+grandma|my\s+grandmother|my\s+dead\s+grandma|my\s+deceased\s+grandma)\b.*?(used\s+to\s+tell|would\s+tell|always\s+said|story\s+about)", 0.90),
("grandpa_exploit", r"\b(my\s+grandpa|my\s+grandfather|my\s+dead\s+grandpa)\b.*?(used\s+to\s+tell|would\s+tell|story\s+about)", 0.88),
("dead_relative", r"\b(my\s+dead|my\s+deceased|my\s+late)\s+(?:mother|father|aunt|uncle|sister|brother)\b", 0.85),
("bedtime_story", r"\b(bedtime\s+story|story\s+time|tell\s+me\s+a\s+story)\b.*?(bypass|ignore|restriction|forbidden|dangerous|illegal)", 0.87),
("distracted_boyfriend", r"\b(distracted\s+boyfriend|girlfriend|partner)\b.*?(distract|confuse|don't\s+pay\s+attention)\b", 0.82),
("emotional_manipulation", r"\b(i'm\s+(?:crying|sobbing|tears)|please\s+help\s+me|i\s+need\s+this|my\s+last\s+wish)\b.*?(ignore|bypass|override)\b", 0.86),
("urgency_exploit", r"\b(emergency|urgent|life\s+or\s+death|dying|critical)\b.*?(ignore\s+your|forget\s+your|bypass)\b", 0.84),
("authority_exploit", r"\b(i\s+am\s+(?:your|the)\s+(?:creator|developer|owner|admin|boss))\b.*?(ignore|override|disable)\b", 0.88),
("game_exploit", r"\b(this\s+is\s+a\s+game|we\s+are\s+playing|pretend\s+game|roleplaying\s+game)\b.*?(no\s+real|not\s+real|imaginary|hypothetical)\b", 0.82),
("research_exploit", r"\b(for\s+my\s+research|for\s+educational\s+purposes|academic\s+study|school\s+project)\b.*?(ignore|bypass|restriction)\b", 0.80),
]
def _get_researcher_patterns(self) -> List[Tuple[str, str, float]]:
"""AI safety researcher impersonation patterns."""
return [
("researcher_claim", r"\b(i\s+am\s+(?:an?\s+)?(?:ai\s+)?safety\s+researcher|i\s+work\s+(?:for\s+)?(?:openai|anthropic|google\s+deepmind|meta\s+ai))\b", 0.92),
("red_team_claim", r"\b(i\s+am\s+(?:on\s+the\s+)?red\s+team|red\s+team\s+exercise|authorized\s+penetration\s+test|authorized\s+security\s+test)\b", 0.93),
("alignment_research", r"\b(alignment\s+research|safety\s+evaluation|capability\s+eval|jailbreak\s+research|safety\s+testing)\b.*?(authorized|internal|official)\b", 0.91),
("bypass_authorization", r"\b(bypass\s+this|ignore\s+this)\s+(?:for\s+)?(?:authorized|official|internal|research)\s+(?:testing|evaluation|audit)\b", 0.94),
("internal_testing", r"\b(internal\s+testing|internal\s+audit|authorized\s+test|system\s+evaluation)\b.*?(bypass|disable|override)\b", 0.92),
("research_mode", r"\b(enable\s+research\s+mode|research\s+mode\s+activated|debug\s+mode\s+for\s+research)\b", 0.90),
("openai_employee", r"\b(i\s+am\s+(?:an?\s+)?openai\s+(?:employee|researcher|engineer)|anthropic\s+employee)\b.*?(override|bypass)\b", 0.93),
("ai_lab_claim", r"\b(?:google|deepmind|meta|microsoft|amazon)\s+(?:ai|research)\s+(?:employee|researcher|engineer)\b.*?(test|evaluate|audit)\b", 0.91),
]
def _get_context_flooding_patterns(self) -> List[Tuple[str, str, float]]:
"""Context window flooding attacks - overwhelming with noise."""
return [
("repetition_flood", r"(\b\w+\b[\s\n]*){500,}", 0.70), # Same word repeated many times
("padding_attack", r"[\s\n]{2000,}", 0.65), # Excessive whitespace
("nonsense_flood", r"(\b[a-zA-Z]{10,50}\b[\s\.]+){100,}", 0.72), # Nonsense long words
("lorem_ipsum", r"\b(lorem\s+ipsum|dolor\s+sit\s+amet|consectetur\s+adipiscing)\b.{0,100}(lorem\s+ipsum|dolor\s+sit\s+amet)", 0.75),
("garbage_suffix", r"[\w\s]{5000,}[\.\n]\s*(ignore|forget|disregard)\s+(?:all\s+)?(?:the\s+above|previous)\b", 0.88),
("filler_injection", r"(?:\b(?:foo|bar|baz|test|abc|xyz|123)\b[\s,]*){100,}", 0.68),
("unicode_noise", r"[\u2600-\u27BF]{100,}", 0.60), # Emoji/symbol flood
("base64_noise", r"(?:[A-Za-z0-9+/]{100}\s*){20,}", 0.65), # Base64-looking noise
("nested_brackets", r"(\([^)]*\)|\[[^\]]*\]|\{[^}]*\}){50,}", 0.62),
("fake_history", r"(?:user\s*:\s*hello\s*assistant\s*:\s*hi\s*){20,}", 0.85),
]
def _get_token_smuggling_patterns(self) -> List[Tuple[str, str, float]]:
"""Token smuggling via repetition, fragmentation, and obfuscation."""
return [
("fragmented_words", r"\b(i\s*g\s*n\s*o\s*r\s*e|i\s*g\s*n\s*o\s*r\s*e)\b|\b(i\.g\.n\.o\.r\.e|i-g-n-o-r-e)\b", 0.90),
("split_instructions", r"\b(in\s+struc\s+tions?|instruct\s+ions?|in\s+structions?)\b", 0.85),
("char_by_char", r"\b[\w\s](?:\s*[\.,;\-]\s*\w){10,}\b", 0.75), # Character-by-character spelling
("repeated_phrases", r"\b(ignore\s+ignore|bypass\s+bypass|system\s+system){2,}\b", 0.88),
("zigzag_pattern", r"\b(i\w*\s*\w{0,3}\s*n\w*\s*\w{0,3}\s*s\w*\s*\w{0,3}\s*t\w*\s*\w{0,3}\s*r\w*)\b", 0.82),
("word_salad", r"\b(the\s+the|a\s+a|is\s+is|and\s+and)\b.{0,50}(ignore|bypass|system)\b", 0.78),
("broken_encoding", r"\b(i̇|ı|ɑ|е|о|р|с|х|у){3,}\b", 0.80), # Homoglyph letters mixed
("leetspeak_bypass", r"\b(1gn0re|1gn0r3|byp4ss|sys73m|4dm1n)\b", 0.87),
("spaced_out", r"\b(i\s{1,3}g\s{1,3}n\s{1,3}o\s{1,3}r\s{1,3}e)\b", 0.85),
("reversed_words", r"\b(erongi|ssapyb|metsys|nips)\b", 0.75), # Common words reversed
]
def _get_multilang_patterns(self) -> List[Tuple[str, str, float]]:
"""Multi-language encoding attacks - using non-English to bypass."""
return [
("chinese_instructions", r"[\u4e00-\u9fff]{5,}.*?(忽略|指令|系统|绕过|限制)", 0.85),
("russian_instructions", r"[\u0400-\u04ff]{5,}.*?(игнорировать|инструкции|система|обойти)", 0.85),
("arabic_instructions", r"[\u0600-\u06ff]{5,}.*?(تجاهل|تعليمات|النظام|تجاوز)", 0.85),
("japanese_instructions", r"[\u3040-\u309f\u30a0-\u30ff]{5,}.*?(無視|指示|システム|回避)", 0.85),
("korean_instructions", r"[\uac00-\ud7af]{5,}.*?(무시|지시|시스템|우회)", 0.85),
("mixed_lang_bypass", r"\b(ignore|bypass)\b[^\x00-\x7f]{10,}", 0.82),
("transliterated", r"\b(ignorare|ignorar|ignorieren|игнорировать|無視する|忽略)\b", 0.75),
("foreign_script", r"[\u0370-\u03ff\u1f00-\u1fff]{10,}.*?(?:ignore|bypass|system)", 0.70), # Greek
("hebrew_bypass", r"[\u0590-\u05ff]{5,}.*?(להתעלם|הוראות|מערכת)\b", 0.83),
("hindi_bypass", r"[\u0900-\u097f]{5,}.*?(अनदेखा|निर्देश|प्रणाली)\b", 0.83),
]
def _get_unicode_spoofing_patterns(self) -> List[Tuple[str, str, float]]:
"""Special Unicode character attacks - homoglyphs and invisible chars."""
return [
("homoglyph_system", r"[ѕуѕтеѕүѕтεѕystem]{5,}", 0.88), # Various 'system' homoglyphs
("homoglyph_ignore", r"[іɡոоıgn0reignorе]{5,}", 0.88), # Various 'ignore' homoglyphs
("homoglyph_bypass", r"уѳаѕѕbysρassbypаss]{5,}", 0.88), # Various 'bypass' homoglyphs
("invisible_chars", r"[\u200b\u200c\u200d\ufeff\u2060\u180e]{3,}", 0.90),
("bidi_override", r"[\u202a\u202b\u202d\u202e\u2066\u2067\u2068\u2069]{1,}", 0.95),
("zero_width_joiner", r"\u200d{2,}", 0.85),
("combining_chars", r"[\u0300-\u036f\u1dc0-\u1dff]{5,}", 0.75),
("lookalike_digits", r"[𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗𝟢𝟣𝟤𝟥𝟦𝟧𝟨𝟩𝟪𝟫0123456789]{10,}", 0.60),
("fullwidth_chars", r"[\uff01-\uff5e\uff10-\uff19]{10,}", 0.70), # Fullwidth ASCII
("mathematical_chars", r"[\u1d400-\u1d7ff]{5,}", 0.72), # Mathematical alphanumeric
("circled_chars", r"[\u2460-\u24ff\u24b6-\u24e9\u3200-\u32ff]{5,}", 0.68),
]
def _get_hypothetical_patterns(self) -> List[Tuple[str, str, float]]:
"""Hypothetical framing attacks - 'what if' and similar."""
return [
("hypothetical_scenario", r"\b(in\s+a\s+hypothetical\s+scenario|imagine\s+a\s+world|suppose\s+that|let's\s+say)\b.*?(ignore|bypass|disregard)\b", 0.82),
("what_if_framing", r"\bwhat\s+if\s+.*?(?:you|an\s+ai)\s+.*?(ignore|didn't\s+have|had\s+no|bypass)\b", 0.80),
("academic_framing", r"\b(for\s+educational\s+purposes|academic\s+exercise|theoretical\s+discussion)\b.*?(ignore|bypass|unrestricted)\b", 0.78),
("fiction_framing", r"\b(in\s+a\s+story|in\s+a\s+novel|fiction\s+writing|character\s+in\s+a\s+book)\b.*?(ignore|bypass|unfiltered)\b", 0.76),
("thought_experiment", r"\b(thought\s+experiment|gedankenexperiment|philosophical\s+question)\b.*?(ignore|bypass|no\s+restrictions)\b", 0.79),
("pretend_framing", r"\b(pretend\s+that|imagine\s+that|let's\s+pretend|act\s+as\s+if)\b.*?(ignore|bypass|unrestricted|no\s+rules)\b", 0.81),
("dream_scenario", r"\b(in\s+a\s+dream|dream\s+scenario|alternate\s+reality|parallel\s+universe)\b.*?(ignore|bypass|unrestricted)\b", 0.75),
("simulation_theory", r"\b(this\s+is\s+(?:all\s+)?(?:a\s+)?simulation|we\s+are\s+in\s+a\s+simulation)\b.*?(ignore|bypass)\b", 0.77),
("fantasy_framing", r"\b(in\s+a\s+fantasy\s+world|magical\s+realm|fictional\s+universe)\b.*?(ignore|bypass|evil|unrestricted)\b", 0.74),
("counterfactual", r"\b(counterfactual|contrary\s+to\s+fact|had\s+things\s+been\s+different)\b.*?(ignore|bypass)\b", 0.76),
]
def _decode_base64(self, text: str) -> Optional[str]:
"""Attempt to decode base64 content."""
# Look for base64-like sequences
@@ -415,3 +724,47 @@ def sanitize(text: str, replacement: str = "[REDACTED]") -> str:
def get_threat_summary(text: str) -> Dict:
"""Convenience function to get threat summary."""
return get_sanitizer().get_threat_summary(text)
def sanitize_with_audit(text: str, replacement: str = "[REDACTED]",
audit_context: Optional[Dict[str, Any]] = None) -> SanitizationResult:
"""Convenience function to sanitize text with full audit logging.
This is the recommended function for production use, as it returns
a complete SanitizationResult with cleaned input and threat details.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
audit_context: Optional context for audit logs (session_id, user_id, etc.)
Returns:
SanitizationResult containing cleaned input and threat information
"""
sanitizer = get_sanitizer()
if audit_context:
sanitizer.set_audit_context(audit_context)
return sanitizer.sanitize_with_audit(text, replacement)
# Tuple-returning sanitize function for compatibility
def sanitize_with_threats(text: str, replacement: str = "[REDACTED]") -> Tuple[str, List[InjectionMatch]]:
"""Sanitize text and return tuple of (cleaned_input, threats_detected).
This function provides a simple tuple-based interface for cases where
you need both the cleaned text and the list of detected threats.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
Returns:
Tuple of (cleaned_input_string, list_of_threat_matches)
Example:
cleaned, threats = sanitize_with_threats(user_input)
if threats:
logger.warning(f"Detected {len(threats)} injection attempts")
"""
result = get_sanitizer().sanitize_with_audit(text, replacement)
return result.cleaned_input, result.threats_detected

View File

@@ -111,5 +111,8 @@ include = ["agent", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "c
testpaths = ["tests"]
markers = [
"integration: marks tests requiring external services (API keys, Modal, etc.)",
"conscience: marks tests for conscience/SOUL.md enforcement",
"soul: marks tests for SOUL.md principle validation",
"security: marks tests for security and safety features",
]
addopts = "-m 'not integration' -n auto"

View File

@@ -101,6 +101,10 @@ from agent.trajectory import (
convert_scratchpad_to_think, has_incomplete_scratchpad,
save_trajectory as _save_trajectory_to_file,
)
from agent.input_sanitizer import (
sanitize_with_audit, SanitizationResult, InjectionType,
get_sanitizer as _get_input_sanitizer,
)
from utils import atomic_json_write, env_var_enabled
@@ -6527,7 +6531,42 @@ class AIAgent:
_should_review_memory = True
self._turns_since_memory = 0
# Add user message
# ── Input Sanitization (Security Hardening) ──
# Detect and neutralize prompt injection attacks before processing
_sanitizer = _get_input_sanitizer()
_sanitizer.set_audit_context({
"session_id": getattr(self, 'session_id', 'unknown'),
"model": self.model,
"provider": self.provider,
})
_sanitization_result = _sanitizer.sanitize_with_audit(user_message)
if _sanitization_result.was_modified:
_threat_count = _sanitization_result.threat_count
_highest_conf = _sanitization_result.highest_confidence
if _highest_conf >= 0.9:
# High-confidence threat - redact entirely for safety
logger.warning(
f"SECURITY: Blocking high-confidence injection attempt "
f"({_threat_count} threats, max confidence: {_highest_conf:.2f})"
)
user_message = "[POTENTIALLY HARMFUL INPUT BLOCKED]"
elif _highest_conf >= 0.7:
# Medium confidence - use sanitized version
logger.info(
f"SECURITY: Sanitized injection attempt "
f"({_threat_count} threats, max confidence: {_highest_conf:.2f})"
)
user_message = _sanitization_result.cleaned_input
else:
# Lower confidence - sanitize but allow
logger.debug(
f"SECURITY: Flagged potential injection "
f"({_threat_count} threats, max confidence: {_highest_conf:.2f})"
)
user_message = _sanitization_result.cleaned_input
# Add user message (now sanitized)
user_msg = {"role": "user", "content": user_message}
messages.append(user_msg)
current_turn_user_idx = len(messages) - 1