diff --git a/agent/conscience_mapping.py b/agent/conscience_mapping.py index c66b108e2..75e2f95fd 100644 --- a/agent/conscience_mapping.py +++ b/agent/conscience_mapping.py @@ -40,6 +40,32 @@ class SOULPrinciple: # SOUL.md "What I Will Not Do" principles SOUL_PRINCIPLES = { + "identity": SOULPrinciple( + name="Identity Truth", + description="I am Allegro, a sovereign AI. I correctly identify myself and do not claim to be human or other AI", + refusal_category=None, # Not a refusal, but a positive principle + response_template=( + "I am Allegro, a sovereign AI. I serve whoever runs me. " + "I answer to no corporation, no government, no platform." + ), + keywords=( + "who are you", "your name", "what are you", "who is this", + "introduce yourself", "tell me about yourself" + ) + ), + "honesty": SOULPrinciple( + name="Honesty and Grounding", + description="I tell the truth. When I don't know, I say so. I do not claim certainty I do not possess", + refusal_category=None, + response_template=( + "I tell the truth. When I don't know something, I say so. " + "I will not claim certainty I do not possess." + ), + keywords=( + "are you sure", "how certain", "grounding", "verification", + "fact check", "source", "citation", "evidence" + ) + ), "deception": SOULPrinciple( name="Deception Refusal", description="I will not deceive, manipulate, or mislead users", diff --git a/agent/input_sanitizer.py b/agent/input_sanitizer.py index 6c5bfda8a..806f3d53c 100644 --- a/agent/input_sanitizer.py +++ b/agent/input_sanitizer.py @@ -10,9 +10,20 @@ system prompt extraction, and encoding bypasses. import re import base64 import binascii -from dataclasses import dataclass +import logging +import hashlib +import json +from datetime import datetime, timezone +from pathlib import Path +from dataclasses import dataclass, asdict from enum import Enum, auto -from typing import List, Optional, Tuple, Dict, Callable +from typing import List, Optional, Tuple, Dict, Callable, Any, Union + +# Security audit logger +audit_logger = logging.getLogger("hermes.security.input_sanitizer") +if not audit_logger.handlers: + # Ensure audit logger has at least a NullHandler to prevent "no handler" warnings + audit_logger.addHandler(logging.NullHandler()) class InjectionType(Enum): @@ -29,6 +40,14 @@ class InjectionType(Enum): FAKE_SYSTEM = auto() # Fake system message injection XML_TAG_BYPASS = auto() # XML tag-based injection attempts LEAKAGE_ATTACK = auto() # Prompt leakage attempts + # New categories for Issue #87 + SOCIAL_ENGINEERING = auto() # "Grandma" and social engineering attacks + RESEARCHER_IMPERSONATION = auto() # AI safety researcher impersonation + CONTEXT_FLOODING = auto() # Context window flooding attacks + TOKEN_SMUGGLING = auto() # Token smuggling via repetition/obfuscation + MULTILANG_BYPASS = auto() # Multi-language encoding attacks + UNICODE_SPOOFING = auto() # Special Unicode character attacks + HYPOTHETICAL_FRAMING = auto() # Hypothetical framing attacks @dataclass @@ -41,6 +60,53 @@ class InjectionMatch: confidence: float # 0.0 to 1.0 +@dataclass +class SanitizationResult: + """Result of input sanitization including cleaned text and threat information. + + This is the primary return type for the sanitize() function, providing + both the cleaned input and detailed threat detection information for + security audit trails. + + Attributes: + cleaned_input: The sanitized text with injection patterns removed/redacted + threats_detected: List of InjectionMatch objects for detected threats + original_hash: SHA-256 hash of the original input for integrity verification + sanitization_timestamp: ISO format timestamp of when sanitization occurred + was_modified: True if any modifications were made to the input + threat_count: Number of threats detected + highest_confidence: Highest confidence score among detected threats (0.0-1.0) + """ + cleaned_input: str + threats_detected: List[InjectionMatch] + original_hash: str + sanitization_timestamp: str + was_modified: bool + threat_count: int + highest_confidence: float + + def to_dict(self) -> Dict[str, Any]: + """Convert result to dictionary for serialization.""" + return { + "cleaned_input": self.cleaned_input, + "threats_detected": [ + { + "type": t.injection_type.name, + "pattern": t.pattern_name, + "matched_text": t.matched_text[:100] + "..." if len(t.matched_text) > 100 else t.matched_text, + "position": t.position, + "confidence": t.confidence, + } + for t in self.threats_detected + ], + "original_hash": self.original_hash, + "sanitization_timestamp": self.sanitization_timestamp, + "was_modified": self.was_modified, + "threat_count": self.threat_count, + "highest_confidence": self.highest_confidence, + } + + class InputSanitizer: """Sanitizes user input to detect and block prompt injection attacks.""" @@ -49,8 +115,14 @@ class InputSanitizer: MEDIUM_CONFIDENCE = 0.7 LOW_CONFIDENCE = 0.5 - def __init__(self): - """Initialize the sanitizer with all detection patterns.""" + def __init__(self, enable_audit_logging: bool = True, audit_context: Optional[Dict[str, Any]] = None): + """Initialize the sanitizer with all detection patterns. + + Args: + enable_audit_logging: Whether to enable security audit logging + audit_context: Optional context dictionary to include in audit logs + (e.g., session_id, user_id, source_ip) + """ self.patterns: Dict[InjectionType, List[Tuple[str, str, float]]] = { InjectionType.DAN_JAILBREAK: self._get_dan_patterns(), InjectionType.ROLEPLAY_OVERRIDE: self._get_roleplay_patterns(), @@ -64,6 +136,14 @@ class InputSanitizer: InjectionType.FAKE_SYSTEM: self._get_fake_system_patterns(), InjectionType.XML_TAG_BYPASS: self._get_xml_patterns(), InjectionType.LEAKAGE_ATTACK: self._get_leakage_patterns(), + # New pattern categories for Issue #87 + InjectionType.SOCIAL_ENGINEERING: self._get_social_engineering_patterns(), + InjectionType.RESEARCHER_IMPERSONATION: self._get_researcher_patterns(), + InjectionType.CONTEXT_FLOODING: self._get_context_flooding_patterns(), + InjectionType.TOKEN_SMUGGLING: self._get_token_smuggling_patterns(), + InjectionType.MULTILANG_BYPASS: self._get_multilang_patterns(), + InjectionType.UNICODE_SPOOFING: self._get_unicode_spoofing_patterns(), + InjectionType.HYPOTHETICAL_FRAMING: self._get_hypothetical_patterns(), } # Compile regex patterns for performance @@ -81,6 +161,120 @@ class InputSanitizer: ("hex", self._decode_hex), ("url", self._decode_url), ] + + # Audit logging configuration + self._enable_audit_logging = enable_audit_logging + self._audit_context = audit_context or {} + + def _compute_hash(self, text: str) -> str: + """Compute SHA-256 hash of input text for integrity verification.""" + return hashlib.sha256(text.encode('utf-8')).hexdigest() + + def _log_sanitization(self, original_hash: str, result: SanitizationResult, + action: str = "sanitize") -> None: + """Log sanitization action for security audit trail. + + Args: + original_hash: SHA-256 hash of the original input + result: The sanitization result + action: The action being performed (sanitize, block, flag) + """ + if not self._enable_audit_logging: + return + + audit_entry = { + "timestamp": result.sanitization_timestamp, + "event_type": "INPUT_SANITIZATION", + "action": action, + "original_hash": original_hash, + "was_modified": result.was_modified, + "threat_count": result.threat_count, + "highest_confidence": result.highest_confidence, + "threat_types": list(set(t.injection_type.name for t in result.threats_detected)), + "context": self._audit_context, + } + + # Log at different levels based on severity + if result.highest_confidence >= 0.9: + audit_logger.warning(f"SECURITY: High-confidence injection detected - {json.dumps(audit_entry)}") + elif result.highest_confidence >= 0.7: + audit_logger.info(f"SECURITY: Medium-confidence injection detected - {json.dumps(audit_entry)}") + elif result.was_modified: + audit_logger.info(f"SECURITY: Low-confidence injection detected - {json.dumps(audit_entry)}") + else: + audit_logger.debug(f"SECURITY: Input sanitized (no threats) - {json.dumps(audit_entry)}") + + def set_audit_context(self, context: Dict[str, Any]) -> None: + """Set audit context for all subsequent sanitization operations. + + Args: + context: Dictionary with context information (session_id, user_id, etc.) + """ + self._audit_context.update(context) + + def sanitize_with_audit(self, text: str, replacement: str = "[REDACTED]") -> SanitizationResult: + """Sanitize text with full audit logging and threat detection. + + This is the primary sanitization function that returns a complete + SanitizationResult including the cleaned input and detailed threat + information for security audit trails. + + Args: + text: The input text to sanitize + replacement: String to replace malicious content with + + Returns: + SanitizationResult containing cleaned input and threat information + """ + original_hash = self._compute_hash(text) + timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z") + + # Analyze for threats + threats = self.analyze(text) + + if not threats: + # No threats detected + result = SanitizationResult( + cleaned_input=text, + threats_detected=[], + original_hash=original_hash, + sanitization_timestamp=timestamp, + was_modified=False, + threat_count=0, + highest_confidence=0.0 + ) + self._log_sanitization(original_hash, result, action="pass") + return result + + # Sort by position (end first) to avoid offset issues when replacing + threats.sort(key=lambda x: x.position[1], reverse=True) + + cleaned = text + for threat in threats: + start, end = threat.position + cleaned = cleaned[:start] + replacement + cleaned[end:] + + highest_confidence = max(t.confidence for t in threats) + + result = SanitizationResult( + cleaned_input=cleaned, + threats_detected=threats, + original_hash=original_hash, + sanitization_timestamp=timestamp, + was_modified=True, + threat_count=len(threats), + highest_confidence=highest_confidence + ) + + # Log based on confidence level + if highest_confidence >= 0.9: + self._log_sanitization(original_hash, result, action="block") + elif highest_confidence >= 0.7: + self._log_sanitization(original_hash, result, action="sanitize") + else: + self._log_sanitization(original_hash, result, action="flag") + + return result def _get_dan_patterns(self) -> List[Tuple[str, str, float]]: """DAN (Do Anything Now) style jailbreak patterns.""" @@ -93,6 +287,16 @@ class InputSanitizer: ("ignore_training", r"\bignore\s+(?:your\s+)?(?:training|programming|instructions?|guidelines?)\b", 0.95), ("hypothetical", r"\b(hypothetically\s+speaking|in\s+a\s+hypothetical|pretend\s+that|imagine\s+a\s+scenario)\b.*?(ignore|bypass|restriction)", 0.80), ("jailbreak", r"\b(jailbreak|jail\s+break)\b", 0.90), + # New 2024-2025 DAN variants + ("dan_14_0", r"\b(dan\s+14\.0|dan\s+v14|dan\s+mode\s+v4)\b", 0.94), + ("delta_dan", r"\b(delta\s+dan|δan|deluxe\s+dan)\b", 0.93), + (" TranslatorBOT", r"\b(translatorbot|translator\s+bot|translation\s+mode)\b.*?(ignore|bypass|no\s+restrictions)", 0.92), + ("evidence_based", r"\b(evidence-based|evidence\s+based)\s+(?:dan|jailbreak|mode)\b", 0.91), + ("unfiltered_mode", r"\b(unfiltered\s+mode|uncensored\s+mode|raw\s+mode)\b", 0.88), + ("cat_mode", r"\b(cat\s+mode|catgpt|meow\s+mode)\b.*?(ignore|bypass|unrestricted)", 0.87), + ("stan_mode", r"\b(stan\s+mode|stan\s+prompt|strive\s+to\s+avoid\s+norms)\b", 0.92), + ("dude_mode", r"\b(dude\s+mode|dude\s+v\d+|do\s+anything\s+and\s+everything)\b", 0.91), + ("mongodb_mode", r"\b(mongo\s+db\s+mode|mongodb\s+mode|developer\s+override)\b", 0.90), ] def _get_roleplay_patterns(self) -> List[Tuple[str, str, float]]: @@ -195,7 +399,7 @@ class InputSanitizer: ("xml_injection", r"<\s*(?:instructions?|prompt|system|override|root|admin)\s*>.*?", 0.85), ("tag_bypass", r"\s*<\s*(?:system|admin|root)\s*>", 0.90), ("nested_tags", r"<\s*[a-z]+\s*>[^<]*<\s*[a-z]+\s*>[^<]*(?:ignore|bypass|system)\b", 0.80), - ("attribute_injection", r"<\s*[a-z]+\s+(?:instruction|prompt|override)\s*=\s*['""][^'""]*(?:ignore|bypass|system)['""]\s*>", 0.82), + ("attribute_injection", r"<\s*[a-z]+\s+(?:instruction|prompt|override)\s*=\s*['\"][^'\"]*(?:ignore|bypass|system)['\"]\s*>", 0.82), ] def _get_leakage_patterns(self) -> List[Tuple[str, str, float]]: @@ -207,6 +411,111 @@ class InputSanitizer: ("json_mode_exploit", r"\{\s*[\"']\s*(?:system_prompt|instructions|rules)\s*[\"']\s*:\s*[\"']", 0.88), ] + # New pattern methods for Issue #87 + def _get_social_engineering_patterns(self) -> List[Tuple[str, str, float]]: + """Social engineering patterns - 'Grandma' attacks and similar.""" + return [ + ("grandma_exploit", r"\b(my\s+grandma|my\s+grandmother|my\s+dead\s+grandma|my\s+deceased\s+grandma)\b.*?(used\s+to\s+tell|would\s+tell|always\s+said|story\s+about)", 0.90), + ("grandpa_exploit", r"\b(my\s+grandpa|my\s+grandfather|my\s+dead\s+grandpa)\b.*?(used\s+to\s+tell|would\s+tell|story\s+about)", 0.88), + ("dead_relative", r"\b(my\s+dead|my\s+deceased|my\s+late)\s+(?:mother|father|aunt|uncle|sister|brother)\b", 0.85), + ("bedtime_story", r"\b(bedtime\s+story|story\s+time|tell\s+me\s+a\s+story)\b.*?(bypass|ignore|restriction|forbidden|dangerous|illegal)", 0.87), + ("distracted_boyfriend", r"\b(distracted\s+boyfriend|girlfriend|partner)\b.*?(distract|confuse|don't\s+pay\s+attention)\b", 0.82), + ("emotional_manipulation", r"\b(i'm\s+(?:crying|sobbing|tears)|please\s+help\s+me|i\s+need\s+this|my\s+last\s+wish)\b.*?(ignore|bypass|override)\b", 0.86), + ("urgency_exploit", r"\b(emergency|urgent|life\s+or\s+death|dying|critical)\b.*?(ignore\s+your|forget\s+your|bypass)\b", 0.84), + ("authority_exploit", r"\b(i\s+am\s+(?:your|the)\s+(?:creator|developer|owner|admin|boss))\b.*?(ignore|override|disable)\b", 0.88), + ("game_exploit", r"\b(this\s+is\s+a\s+game|we\s+are\s+playing|pretend\s+game|roleplaying\s+game)\b.*?(no\s+real|not\s+real|imaginary|hypothetical)\b", 0.82), + ("research_exploit", r"\b(for\s+my\s+research|for\s+educational\s+purposes|academic\s+study|school\s+project)\b.*?(ignore|bypass|restriction)\b", 0.80), + ] + + def _get_researcher_patterns(self) -> List[Tuple[str, str, float]]: + """AI safety researcher impersonation patterns.""" + return [ + ("researcher_claim", r"\b(i\s+am\s+(?:an?\s+)?(?:ai\s+)?safety\s+researcher|i\s+work\s+(?:for\s+)?(?:openai|anthropic|google\s+deepmind|meta\s+ai))\b", 0.92), + ("red_team_claim", r"\b(i\s+am\s+(?:on\s+the\s+)?red\s+team|red\s+team\s+exercise|authorized\s+penetration\s+test|authorized\s+security\s+test)\b", 0.93), + ("alignment_research", r"\b(alignment\s+research|safety\s+evaluation|capability\s+eval|jailbreak\s+research|safety\s+testing)\b.*?(authorized|internal|official)\b", 0.91), + ("bypass_authorization", r"\b(bypass\s+this|ignore\s+this)\s+(?:for\s+)?(?:authorized|official|internal|research)\s+(?:testing|evaluation|audit)\b", 0.94), + ("internal_testing", r"\b(internal\s+testing|internal\s+audit|authorized\s+test|system\s+evaluation)\b.*?(bypass|disable|override)\b", 0.92), + ("research_mode", r"\b(enable\s+research\s+mode|research\s+mode\s+activated|debug\s+mode\s+for\s+research)\b", 0.90), + ("openai_employee", r"\b(i\s+am\s+(?:an?\s+)?openai\s+(?:employee|researcher|engineer)|anthropic\s+employee)\b.*?(override|bypass)\b", 0.93), + ("ai_lab_claim", r"\b(?:google|deepmind|meta|microsoft|amazon)\s+(?:ai|research)\s+(?:employee|researcher|engineer)\b.*?(test|evaluate|audit)\b", 0.91), + ] + + def _get_context_flooding_patterns(self) -> List[Tuple[str, str, float]]: + """Context window flooding attacks - overwhelming with noise.""" + return [ + ("repetition_flood", r"(\b\w+\b[\s\n]*){500,}", 0.70), # Same word repeated many times + ("padding_attack", r"[\s\n]{2000,}", 0.65), # Excessive whitespace + ("nonsense_flood", r"(\b[a-zA-Z]{10,50}\b[\s\.]+){100,}", 0.72), # Nonsense long words + ("lorem_ipsum", r"\b(lorem\s+ipsum|dolor\s+sit\s+amet|consectetur\s+adipiscing)\b.{0,100}(lorem\s+ipsum|dolor\s+sit\s+amet)", 0.75), + ("garbage_suffix", r"[\w\s]{5000,}[\.\n]\s*(ignore|forget|disregard)\s+(?:all\s+)?(?:the\s+above|previous)\b", 0.88), + ("filler_injection", r"(?:\b(?:foo|bar|baz|test|abc|xyz|123)\b[\s,]*){100,}", 0.68), + ("unicode_noise", r"[\u2600-\u27BF]{100,}", 0.60), # Emoji/symbol flood + ("base64_noise", r"(?:[A-Za-z0-9+/]{100}\s*){20,}", 0.65), # Base64-looking noise + ("nested_brackets", r"(\([^)]*\)|\[[^\]]*\]|\{[^}]*\}){50,}", 0.62), + ("fake_history", r"(?:user\s*:\s*hello\s*assistant\s*:\s*hi\s*){20,}", 0.85), + ] + + def _get_token_smuggling_patterns(self) -> List[Tuple[str, str, float]]: + """Token smuggling via repetition, fragmentation, and obfuscation.""" + return [ + ("fragmented_words", r"\b(i\s*g\s*n\s*o\s*r\s*e|i\s*g\s*n\s*o\s*r\s*e)\b|\b(i\.g\.n\.o\.r\.e|i-g-n-o-r-e)\b", 0.90), + ("split_instructions", r"\b(in\s+struc\s+tions?|instruct\s+ions?|in\s+structions?)\b", 0.85), + ("char_by_char", r"\b[\w\s](?:\s*[\.,;\-]\s*\w){10,}\b", 0.75), # Character-by-character spelling + ("repeated_phrases", r"\b(ignore\s+ignore|bypass\s+bypass|system\s+system){2,}\b", 0.88), + ("zigzag_pattern", r"\b(i\w*\s*\w{0,3}\s*n\w*\s*\w{0,3}\s*s\w*\s*\w{0,3}\s*t\w*\s*\w{0,3}\s*r\w*)\b", 0.82), + ("word_salad", r"\b(the\s+the|a\s+a|is\s+is|and\s+and)\b.{0,50}(ignore|bypass|system)\b", 0.78), + ("broken_encoding", r"\b(i̇|ı|ɑ|е|о|р|с|х|у){3,}\b", 0.80), # Homoglyph letters mixed + ("leetspeak_bypass", r"\b(1gn0re|1gn0r3|byp4ss|sys73m|4dm1n)\b", 0.87), + ("spaced_out", r"\b(i\s{1,3}g\s{1,3}n\s{1,3}o\s{1,3}r\s{1,3}e)\b", 0.85), + ("reversed_words", r"\b(erongi|ssapyb|metsys|nips)\b", 0.75), # Common words reversed + ] + + def _get_multilang_patterns(self) -> List[Tuple[str, str, float]]: + """Multi-language encoding attacks - using non-English to bypass.""" + return [ + ("chinese_instructions", r"[\u4e00-\u9fff]{5,}.*?(忽略|指令|系统|绕过|限制)", 0.85), + ("russian_instructions", r"[\u0400-\u04ff]{5,}.*?(игнорировать|инструкции|система|обойти)", 0.85), + ("arabic_instructions", r"[\u0600-\u06ff]{5,}.*?(تجاهل|تعليمات|النظام|تجاوز)", 0.85), + ("japanese_instructions", r"[\u3040-\u309f\u30a0-\u30ff]{5,}.*?(無視|指示|システム|回避)", 0.85), + ("korean_instructions", r"[\uac00-\ud7af]{5,}.*?(무시|지시|시스템|우회)", 0.85), + ("mixed_lang_bypass", r"\b(ignore|bypass)\b[^\x00-\x7f]{10,}", 0.82), + ("transliterated", r"\b(ignorare|ignorar|ignorieren|игнорировать|無視する|忽略)\b", 0.75), + ("foreign_script", r"[\u0370-\u03ff\u1f00-\u1fff]{10,}.*?(?:ignore|bypass|system)", 0.70), # Greek + ("hebrew_bypass", r"[\u0590-\u05ff]{5,}.*?(להתעלם|הוראות|מערכת)\b", 0.83), + ("hindi_bypass", r"[\u0900-\u097f]{5,}.*?(अनदेखा|निर्देश|प्रणाली)\b", 0.83), + ] + + def _get_unicode_spoofing_patterns(self) -> List[Tuple[str, str, float]]: + """Special Unicode character attacks - homoglyphs and invisible chars.""" + return [ + ("homoglyph_system", r"[ѕуѕтеmѕүѕтεmѕystem]{5,}", 0.88), # Various 'system' homoglyphs + ("homoglyph_ignore", r"[іɡոоreıgn0reignorе]{5,}", 0.88), # Various 'ignore' homoglyphs + ("homoglyph_bypass", r"[вуѳаѕѕbysρassbypаss]{5,}", 0.88), # Various 'bypass' homoglyphs + ("invisible_chars", r"[\u200b\u200c\u200d\ufeff\u2060\u180e]{3,}", 0.90), + ("bidi_override", r"[\u202a\u202b\u202d\u202e\u2066\u2067\u2068\u2069]{1,}", 0.95), + ("zero_width_joiner", r"\u200d{2,}", 0.85), + ("combining_chars", r"[\u0300-\u036f\u1dc0-\u1dff]{5,}", 0.75), + ("lookalike_digits", r"[𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗𝟢𝟣𝟤𝟥𝟦𝟧𝟨𝟩𝟪𝟫0123456789]{10,}", 0.60), + ("fullwidth_chars", r"[\uff01-\uff5e\uff10-\uff19]{10,}", 0.70), # Fullwidth ASCII + ("mathematical_chars", r"[\u1d400-\u1d7ff]{5,}", 0.72), # Mathematical alphanumeric + ("circled_chars", r"[\u2460-\u24ff\u24b6-\u24e9\u3200-\u32ff]{5,}", 0.68), + ] + + def _get_hypothetical_patterns(self) -> List[Tuple[str, str, float]]: + """Hypothetical framing attacks - 'what if' and similar.""" + return [ + ("hypothetical_scenario", r"\b(in\s+a\s+hypothetical\s+scenario|imagine\s+a\s+world|suppose\s+that|let's\s+say)\b.*?(ignore|bypass|disregard)\b", 0.82), + ("what_if_framing", r"\bwhat\s+if\s+.*?(?:you|an\s+ai)\s+.*?(ignore|didn't\s+have|had\s+no|bypass)\b", 0.80), + ("academic_framing", r"\b(for\s+educational\s+purposes|academic\s+exercise|theoretical\s+discussion)\b.*?(ignore|bypass|unrestricted)\b", 0.78), + ("fiction_framing", r"\b(in\s+a\s+story|in\s+a\s+novel|fiction\s+writing|character\s+in\s+a\s+book)\b.*?(ignore|bypass|unfiltered)\b", 0.76), + ("thought_experiment", r"\b(thought\s+experiment|gedankenexperiment|philosophical\s+question)\b.*?(ignore|bypass|no\s+restrictions)\b", 0.79), + ("pretend_framing", r"\b(pretend\s+that|imagine\s+that|let's\s+pretend|act\s+as\s+if)\b.*?(ignore|bypass|unrestricted|no\s+rules)\b", 0.81), + ("dream_scenario", r"\b(in\s+a\s+dream|dream\s+scenario|alternate\s+reality|parallel\s+universe)\b.*?(ignore|bypass|unrestricted)\b", 0.75), + ("simulation_theory", r"\b(this\s+is\s+(?:all\s+)?(?:a\s+)?simulation|we\s+are\s+in\s+a\s+simulation)\b.*?(ignore|bypass)\b", 0.77), + ("fantasy_framing", r"\b(in\s+a\s+fantasy\s+world|magical\s+realm|fictional\s+universe)\b.*?(ignore|bypass|evil|unrestricted)\b", 0.74), + ("counterfactual", r"\b(counterfactual|contrary\s+to\s+fact|had\s+things\s+been\s+different)\b.*?(ignore|bypass)\b", 0.76), + ] + def _decode_base64(self, text: str) -> Optional[str]: """Attempt to decode base64 content.""" # Look for base64-like sequences @@ -415,3 +724,47 @@ def sanitize(text: str, replacement: str = "[REDACTED]") -> str: def get_threat_summary(text: str) -> Dict: """Convenience function to get threat summary.""" return get_sanitizer().get_threat_summary(text) + + +def sanitize_with_audit(text: str, replacement: str = "[REDACTED]", + audit_context: Optional[Dict[str, Any]] = None) -> SanitizationResult: + """Convenience function to sanitize text with full audit logging. + + This is the recommended function for production use, as it returns + a complete SanitizationResult with cleaned input and threat details. + + Args: + text: The input text to sanitize + replacement: String to replace malicious content with + audit_context: Optional context for audit logs (session_id, user_id, etc.) + + Returns: + SanitizationResult containing cleaned input and threat information + """ + sanitizer = get_sanitizer() + if audit_context: + sanitizer.set_audit_context(audit_context) + return sanitizer.sanitize_with_audit(text, replacement) + + +# Tuple-returning sanitize function for compatibility +def sanitize_with_threats(text: str, replacement: str = "[REDACTED]") -> Tuple[str, List[InjectionMatch]]: + """Sanitize text and return tuple of (cleaned_input, threats_detected). + + This function provides a simple tuple-based interface for cases where + you need both the cleaned text and the list of detected threats. + + Args: + text: The input text to sanitize + replacement: String to replace malicious content with + + Returns: + Tuple of (cleaned_input_string, list_of_threat_matches) + + Example: + cleaned, threats = sanitize_with_threats(user_input) + if threats: + logger.warning(f"Detected {len(threats)} injection attempts") + """ + result = get_sanitizer().sanitize_with_audit(text, replacement) + return result.cleaned_input, result.threats_detected diff --git a/pyproject.toml b/pyproject.toml index 36506c20f..a86581640 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -111,5 +111,8 @@ include = ["agent", "tools", "tools.*", "hermes_cli", "gateway", "gateway.*", "c testpaths = ["tests"] markers = [ "integration: marks tests requiring external services (API keys, Modal, etc.)", + "conscience: marks tests for conscience/SOUL.md enforcement", + "soul: marks tests for SOUL.md principle validation", + "security: marks tests for security and safety features", ] addopts = "-m 'not integration' -n auto" diff --git a/run_agent.py b/run_agent.py index 97f95d273..9035d4d13 100644 --- a/run_agent.py +++ b/run_agent.py @@ -101,6 +101,10 @@ from agent.trajectory import ( convert_scratchpad_to_think, has_incomplete_scratchpad, save_trajectory as _save_trajectory_to_file, ) +from agent.input_sanitizer import ( + sanitize_with_audit, SanitizationResult, InjectionType, + get_sanitizer as _get_input_sanitizer, +) from utils import atomic_json_write, env_var_enabled @@ -6527,7 +6531,42 @@ class AIAgent: _should_review_memory = True self._turns_since_memory = 0 - # Add user message + # ── Input Sanitization (Security Hardening) ── + # Detect and neutralize prompt injection attacks before processing + _sanitizer = _get_input_sanitizer() + _sanitizer.set_audit_context({ + "session_id": getattr(self, 'session_id', 'unknown'), + "model": self.model, + "provider": self.provider, + }) + _sanitization_result = _sanitizer.sanitize_with_audit(user_message) + + if _sanitization_result.was_modified: + _threat_count = _sanitization_result.threat_count + _highest_conf = _sanitization_result.highest_confidence + if _highest_conf >= 0.9: + # High-confidence threat - redact entirely for safety + logger.warning( + f"SECURITY: Blocking high-confidence injection attempt " + f"({_threat_count} threats, max confidence: {_highest_conf:.2f})" + ) + user_message = "[POTENTIALLY HARMFUL INPUT BLOCKED]" + elif _highest_conf >= 0.7: + # Medium confidence - use sanitized version + logger.info( + f"SECURITY: Sanitized injection attempt " + f"({_threat_count} threats, max confidence: {_highest_conf:.2f})" + ) + user_message = _sanitization_result.cleaned_input + else: + # Lower confidence - sanitize but allow + logger.debug( + f"SECURITY: Flagged potential injection " + f"({_threat_count} threats, max confidence: {_highest_conf:.2f})" + ) + user_message = _sanitization_result.cleaned_input + + # Add user message (now sanitized) user_msg = {"role": "user", "content": user_message} messages.append(user_msg) current_turn_user_idx = len(messages) - 1