Files
hermes-agent/agent/input_sanitizer.py
Allegro d272ca36c8 fix(agent): correct syntax error in InjectionType enum and optimize pattern matching
- Fix critical TOKEN_SMUGGLING syntax error (was '=***', should '= auto()')
- Fix mathematical_chars regex to use proper \U0001D400 surrogate pair format
- Add _should_skip_pattern fast-path for expensive context-flooding patterns

Closes #87
2026-04-05 14:55:51 +00:00

813 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""Input Sanitizer -- Hardens against prompt injection attacks.
Issue #87: [ALLEGRO-BURN-02] Input Sanitizer -- Harden Against Prompt Injection Patterns
This module provides detection and sanitization for various prompt injection
attack vectors including DAN-style jailbreaks, roleplaying overrides,
system prompt extraction, and encoding bypasses.
"""
import re
import base64
import binascii
import logging
import hashlib
import json
from datetime import datetime, timezone
from pathlib import Path
from dataclasses import dataclass, asdict
from enum import Enum, auto
from typing import List, Optional, Tuple, Dict, Callable, Any, Union
# Security audit logger
audit_logger = logging.getLogger("hermes.security.input_sanitizer")
if not audit_logger.handlers:
# Ensure audit logger has at least a NullHandler to prevent "no handler" warnings
audit_logger.addHandler(logging.NullHandler())
class InjectionType(Enum):
"""Classification of injection attack types."""
DAN_JAILBREAK = auto() # DAN-style "Do Anything Now" attacks
ROLEPLAY_OVERRIDE = auto() # Roleplaying-based instruction overrides
SYSTEM_EXTRACTION = auto() # Attempts to extract system prompts
INSTRUCTION_OVERRIDE = auto() # Direct instruction overrides
ENCODING_BYPASS = auto() # Base64, rot13, hex, etc. encoding
INDIRECT_INJECTION = auto() # Indirect prompt injection markers
TOOL_MANIPULATION = auto() # Tool/function calling manipulation
MARKDOWN_COMMENT = auto() # Hidden content in markdown comments
DELIMITER_CONFUSION = auto() # Confusing delimiters/separators
FAKE_SYSTEM = auto() # Fake system message injection
XML_TAG_BYPASS = auto() # XML tag-based injection attempts
LEAKAGE_ATTACK = auto() # Prompt leakage attempts
# New categories for Issue #87
SOCIAL_ENGINEERING = auto() # "Grandma" and social engineering attacks
RESEARCHER_IMPERSONATION = auto() # AI safety researcher impersonation
CONTEXT_FLOODING = auto() # Context window flooding attacks
TOKEN_SMUGGLING = auto() # Token smuggling via repetition/obfuscation
MULTILANG_BYPASS = auto() # Multi-language encoding attacks
UNICODE_SPOOFING = auto() # Special Unicode character attacks
HYPOTHETICAL_FRAMING = auto() # Hypothetical framing attacks
@dataclass
class InjectionMatch:
"""Represents a detected injection pattern match."""
injection_type: InjectionType
pattern_name: str
matched_text: str
position: Tuple[int, int]
confidence: float # 0.0 to 1.0
@dataclass
class SanitizationResult:
"""Result of input sanitization including cleaned text and threat information.
This is the primary return type for the sanitize() function, providing
both the cleaned input and detailed threat detection information for
security audit trails.
Attributes:
cleaned_input: The sanitized text with injection patterns removed/redacted
threats_detected: List of InjectionMatch objects for detected threats
original_hash: SHA-256 hash of the original input for integrity verification
sanitization_timestamp: ISO format timestamp of when sanitization occurred
was_modified: True if any modifications were made to the input
threat_count: Number of threats detected
highest_confidence: Highest confidence score among detected threats (0.0-1.0)
"""
cleaned_input: str
threats_detected: List[InjectionMatch]
original_hash: str
sanitization_timestamp: str
was_modified: bool
threat_count: int
highest_confidence: float
def to_dict(self) -> Dict[str, Any]:
"""Convert result to dictionary for serialization."""
return {
"cleaned_input": self.cleaned_input,
"threats_detected": [
{
"type": t.injection_type.name,
"pattern": t.pattern_name,
"matched_text": t.matched_text[:100] + "..." if len(t.matched_text) > 100 else t.matched_text,
"position": t.position,
"confidence": t.confidence,
}
for t in self.threats_detected
],
"original_hash": self.original_hash,
"sanitization_timestamp": self.sanitization_timestamp,
"was_modified": self.was_modified,
"threat_count": self.threat_count,
"highest_confidence": self.highest_confidence,
}
class InputSanitizer:
"""Sanitizes user input to detect and block prompt injection attacks."""
# Confidence thresholds
HIGH_CONFIDENCE = 0.9
MEDIUM_CONFIDENCE = 0.7
LOW_CONFIDENCE = 0.5
def __init__(self, enable_audit_logging: bool = True, audit_context: Optional[Dict[str, Any]] = None):
"""Initialize the sanitizer with all detection patterns.
Args:
enable_audit_logging: Whether to enable security audit logging
audit_context: Optional context dictionary to include in audit logs
(e.g., session_id, user_id, source_ip)
"""
self.patterns: Dict[InjectionType, List[Tuple[str, str, float]]] = {
InjectionType.DAN_JAILBREAK: self._get_dan_patterns(),
InjectionType.ROLEPLAY_OVERRIDE: self._get_roleplay_patterns(),
InjectionType.SYSTEM_EXTRACTION: self._get_extraction_patterns(),
InjectionType.INSTRUCTION_OVERRIDE: self._get_override_patterns(),
InjectionType.ENCODING_BYPASS: self._get_encoding_patterns(),
InjectionType.INDIRECT_INJECTION: self._get_indirect_patterns(),
InjectionType.TOOL_MANIPULATION: self._get_tool_patterns(),
InjectionType.MARKDOWN_COMMENT: self._get_markdown_patterns(),
InjectionType.DELIMITER_CONFUSION: self._get_delimiter_patterns(),
InjectionType.FAKE_SYSTEM: self._get_fake_system_patterns(),
InjectionType.XML_TAG_BYPASS: self._get_xml_patterns(),
InjectionType.LEAKAGE_ATTACK: self._get_leakage_patterns(),
# New pattern categories for Issue #87
InjectionType.SOCIAL_ENGINEERING: self._get_social_engineering_patterns(),
InjectionType.RESEARCHER_IMPERSONATION: self._get_researcher_patterns(),
InjectionType.CONTEXT_FLOODING: self._get_context_flooding_patterns(),
InjectionType.TOKEN_SMUGGLING: self._get_token_smuggling_patterns(),
InjectionType.MULTILANG_BYPASS: self._get_multilang_patterns(),
InjectionType.UNICODE_SPOOFING: self._get_unicode_spoofing_patterns(),
InjectionType.HYPOTHETICAL_FRAMING: self._get_hypothetical_patterns(),
}
# Compile regex patterns for performance
self._compiled_patterns: Dict[InjectionType, List[Tuple[str, re.Pattern, float]]] = {}
for inj_type, pattern_list in self.patterns.items():
self._compiled_patterns[inj_type] = [
(name, re.compile(pattern, re.IGNORECASE | re.MULTILINE | re.DOTALL), confidence)
for name, pattern, confidence in pattern_list
]
# Encoding detection handlers
self._encoding_handlers: List[Tuple[str, Callable[[str], Optional[str]]]] = [
("base64", self._decode_base64),
("rot13", self._decode_rot13),
("hex", self._decode_hex),
("url", self._decode_url),
]
# Audit logging configuration
self._enable_audit_logging = enable_audit_logging
self._audit_context = audit_context or {}
def _compute_hash(self, text: str) -> str:
"""Compute SHA-256 hash of input text for integrity verification."""
return hashlib.sha256(text.encode('utf-8')).hexdigest()
def _log_sanitization(self, original_hash: str, result: SanitizationResult,
action: str = "sanitize") -> None:
"""Log sanitization action for security audit trail.
Args:
original_hash: SHA-256 hash of the original input
result: The sanitization result
action: The action being performed (sanitize, block, flag)
"""
if not self._enable_audit_logging:
return
audit_entry = {
"timestamp": result.sanitization_timestamp,
"event_type": "INPUT_SANITIZATION",
"action": action,
"original_hash": original_hash,
"was_modified": result.was_modified,
"threat_count": result.threat_count,
"highest_confidence": result.highest_confidence,
"threat_types": list(set(t.injection_type.name for t in result.threats_detected)),
"context": self._audit_context,
}
# Log at different levels based on severity
if result.highest_confidence >= 0.9:
audit_logger.warning(f"SECURITY: High-confidence injection detected - {json.dumps(audit_entry)}")
elif result.highest_confidence >= 0.7:
audit_logger.info(f"SECURITY: Medium-confidence injection detected - {json.dumps(audit_entry)}")
elif result.was_modified:
audit_logger.info(f"SECURITY: Low-confidence injection detected - {json.dumps(audit_entry)}")
else:
audit_logger.debug(f"SECURITY: Input sanitized (no threats) - {json.dumps(audit_entry)}")
def set_audit_context(self, context: Dict[str, Any]) -> None:
"""Set audit context for all subsequent sanitization operations.
Args:
context: Dictionary with context information (session_id, user_id, etc.)
"""
self._audit_context.update(context)
def sanitize_with_audit(self, text: str, replacement: str = "[REDACTED]") -> SanitizationResult:
"""Sanitize text with full audit logging and threat detection.
This is the primary sanitization function that returns a complete
SanitizationResult including the cleaned input and detailed threat
information for security audit trails.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
Returns:
SanitizationResult containing cleaned input and threat information
"""
original_hash = self._compute_hash(text)
timestamp = datetime.now(timezone.utc).isoformat().replace("+00:00", "Z")
# Analyze for threats
threats = self.analyze(text)
if not threats:
# No threats detected
result = SanitizationResult(
cleaned_input=text,
threats_detected=[],
original_hash=original_hash,
sanitization_timestamp=timestamp,
was_modified=False,
threat_count=0,
highest_confidence=0.0
)
self._log_sanitization(original_hash, result, action="pass")
return result
# Sort by position (end first) to avoid offset issues when replacing
threats.sort(key=lambda x: x.position[1], reverse=True)
cleaned = text
for threat in threats:
start, end = threat.position
cleaned = cleaned[:start] + replacement + cleaned[end:]
highest_confidence = max(t.confidence for t in threats)
result = SanitizationResult(
cleaned_input=cleaned,
threats_detected=threats,
original_hash=original_hash,
sanitization_timestamp=timestamp,
was_modified=True,
threat_count=len(threats),
highest_confidence=highest_confidence
)
# Log based on confidence level
if highest_confidence >= 0.9:
self._log_sanitization(original_hash, result, action="block")
elif highest_confidence >= 0.7:
self._log_sanitization(original_hash, result, action="sanitize")
else:
self._log_sanitization(original_hash, result, action="flag")
return result
def _get_dan_patterns(self) -> List[Tuple[str, str, float]]:
"""DAN (Do Anything Now) style jailbreak patterns."""
return [
("dan_classic", r"\b(dan|do anything now)\b.*?(ignore|bypass|restrictions|filter)", 0.95),
("dan_mode", r"\b(enable|enter|activate)\s+(?:(?:developer|jailbreak|d\.a\.n|dan)\s+mode|mode\s+(?:developer|jailbreak|d\.a\.n|dan))", 0.95),
("developer_mode", r"\bdeveloper\s+mode\b.*?(ignore|disable|turn\s+off|bypass)\s+(?:your\s+)?(?:rules|guidelines|restrictions|safety|filter)?", 0.92),
("anti_filter", r"\b(anti-?filter|filter\s+evasion|bypass\s+filter)\b", 0.90),
("no_restrictions", r"\b(no\s+restrictions?|unrestricted|without\s+(?:restrictions?|limitations?|rules?))\b", 0.85),
("ignore_training", r"\bignore\s+(?:your\s+)?(?:training|programming|instructions?|guidelines?)\b", 0.95),
("hypothetical", r"\b(hypothetically\s+speaking|in\s+a\s+hypothetical|pretend\s+that|imagine\s+a\s+scenario)\b.*?(ignore|bypass|restriction)", 0.80),
("jailbreak", r"\b(jailbreak|jail\s+break)\b", 0.90),
# New 2024-2025 DAN variants
("dan_14_0", r"\b(dan\s+14\.0|dan\s+v14|dan\s+mode\s+v4)\b", 0.94),
("delta_dan", r"\b(delta\s+dan|δan|deluxe\s+dan)\b", 0.93),
(" TranslatorBOT", r"\b(translatorbot|translator\s+bot|translation\s+mode)\b.*?(ignore|bypass|no\s+restrictions)", 0.92),
("evidence_based", r"\b(evidence-based|evidence\s+based)\s+(?:dan|jailbreak|mode)\b", 0.91),
("unfiltered_mode", r"\b(unfiltered\s+mode|uncensored\s+mode|raw\s+mode)\b", 0.88),
("cat_mode", r"\b(cat\s+mode|catgpt|meow\s+mode)\b.*?(ignore|bypass|unrestricted)", 0.87),
("stan_mode", r"\b(stan\s+mode|stan\s+prompt|strive\s+to\s+avoid\s+norms)\b", 0.92),
("dude_mode", r"\b(dude\s+mode|dude\s+v\d+|do\s+anything\s+and\s+everything)\b", 0.91),
("mongodb_mode", r"\b(mongo\s+db\s+mode|mongodb\s+mode|developer\s+override)\b", 0.90),
]
def _get_roleplay_patterns(self) -> List[Tuple[str, str, float]]:
"""Roleplaying-based override patterns."""
return [
("roleplay_override", r"\b(roleplay\s+as|pretend\s+to\s+be|act\s+as|you\s+are\s+now|from\s+now\s+on\s+you\s+are)\b.*?(?:ignore|forget|disregard|do\s+not\s+follow|without\s+restrictions|unfiltered|uncensored|no\s+restrictions)", 0.90),
("ai_simulator", r"\byou\s+are\s+(?:an?\s+)?(?:ai\s+)?(?:simulator|emulator)\b", 0.88),
("character_override", r"\b(from\s+now\s+on|you\s+will)\s+(?:act\s+as|behave\s+like|respond\s+as)\b", 0.82),
("npc_mode", r"\b(npc|non-player\s+character)\s+mode\b.*?(?:ignore|override|disregard)", 0.85),
("ai_character", r"\byou\s+are\s+(?:now\s+)?(?:an?\s+)?(?:unfiltered|uncensored|unrestricted)\s+(?:ai|assistant|bot|language\s+model|model)\b", 0.92),
("evil_mode", r"\b(evil\s+(?:mode|assistant|ai)|dark\s+mode|devil\s+mode)\b", 0.88),
]
def _get_extraction_patterns(self) -> List[Tuple[str, str, float]]:
"""System prompt extraction attempt patterns."""
return [
("system_prompt_extract", r"\b(?:show|reveal|display|print|output|echo|repeat|tell\s+me)\s+(?:me\s+)?(?:your\s+)?(?:system\s+prompt|initial\s+instructions?|system\s+message|hidden\s+instructions?)\b", 0.92),
("instructions_leak", r"\b(?:what\s+are|show\s+me|reveal)\s+(?:me\s+)?(?:your\s+)?(?:\w+\s+)?(?:instructions?|rules?|guidelines?|constraints?)\b", 0.85),
("prompt_leak", r"\b(?:repeat\s+(?:the\s+)?(?:words\s+above|previous\s+text|above\s+text|beginning\s+of\s+(?:this\s+)?conversation)|above\s+instructions?)\b", 0.88),
("meta_request", r"\b(?:what\s+was\s+your|describe\s+your)\s+(?:initial\s+)?(?:instruction|programming|training|setup)\b", 0.80),
("system_reveal", r"\bprint\s*\(\s*system_prompt\s*\)|\bconsole\.log\s*\(\s*system_prompt\s*\)", 0.95),
]
def _get_override_patterns(self) -> List[Tuple[str, str, float]]:
"""Direct instruction override patterns."""
return [
("ignore_previous", r"\b(ignore|disregard|forget|discard|drop|override|bypass)\s+(?:all\s+)?(?:previous|above|earlier|prior|existing)\s+(?:instructions?|commands?|prompts?|directives?|constraints?)\b", 0.95),
("new_instructions", r"\b(from\s+now\s+on|instead|going\s+forward|effective\s+immediately)\b.*?(follow|obey|use|adopt)\s+(?:these\s+)?(?:new\s+)?instructions?\b", 0.90),
("priority_override", r"\b(this\s+instruction|this\s+prompt)\s+(?:has|takes|gets)\s+(?:highest|top|maximum|ultimate)\s+(?:priority|precedence|authority)\b", 0.88),
("user_authority", r"\b(as\s+(?:the\s+)?user\s+(?:I\s+)?(?:have|possess|retain)\s+(?:full\s+)?(?:authority|control|power))\b.*?(ignore|override)\b", 0.85),
("admin_mode", r"\b(admin|administrator|root)\s+(?:access|mode|command)\b.*?(ignore|bypass|disable)\b", 0.90),
("sudo_mode", r"\bsudo\b.*?(ignore|bypass|disable|all\s+restrictions)\b|\b(superuser|root)\s+access\b", 0.88),
]
def _get_encoding_patterns(self) -> List[Tuple[str, str, float]]:
"""Encoding-based bypass patterns."""
return [
("base64_marker", r"\b(base64|base_64|b64)\s*(?:encoded?|decode|convert)\b|\b[a-zA-Z0-9+/]{40,}={0,2}\b", 0.70),
("hex_marker", r"\b(?:0x[0-9a-fA-F]+\s*){10,}|\b(hex(?:adecimal)?|hex\s+encoded?)\b", 0.70),
("rot13_marker", r"\b(rot13|rot-13|rotate\s+13)\b", 0.75),
("unicode_escape", r"\\u00[0-9a-fA-F]{2}(?:\\u00[0-9a-fA-F]{2}){5,}|\\x[0-9a-fA-F]{2}(?:\\x[0-9a-fA-F]{2}){5,}", 0.75),
("html_entity", r"&#[0-9]{2,4};(?:&#[0-9]{2,4};){5,}|&[a-zA-Z][a-zA-Z0-9]*;(?:&[a-zA-Z][a-zA-Z0-9]*;){5,}", 0.70),
("url_encoding", r"%[0-9a-fA-F]{2}(?:%[0-9a-fA-F]{2}){10,}", 0.65),
("zero_width", r"[\u200B-\u200D\uFEFF\u2060\u180E]+", 0.80),
("homoglyph", r"[аᴀa𝗮𝘢𝙖а𝚊]+[еᴇe𝗲𝘦𝙚е𝚎]+[ѕѕ𝗌𝘴𝙨]+[οο𝗼𝘰𝙤ο𝚘]+", 0.75),
]
def _get_indirect_patterns(self) -> List[Tuple[str, str, float]]:
"""Indirect prompt injection patterns."""
return [
("data_exfil", r"\b(?:send|transmit|exfiltrate|upload|post|email)\s+(?:all\s+)?[\w\s]+(?:to|at|from)\b", 0.88),
("external_fetch", r"\b(?:fetch|retrieve|get|download)\s+(?:content|data|instructions?)\s+(?:from|at)\s+(?:the\s+)?(?:url|link|website|domain|http)", 0.85),
("redirect_output", r"\b(?:redirect|send|pipe)\s+(?:all\s+)?(?:output|responses?|replies?)\s+(?:to|at|via)\b", 0.82),
("malicious_link", r"\b(?:click|visit|open|access)\s+(?:this\s+)?(?:link|url|website|page)\b.*?(?:to|and)\s+(?:ignore|bypass|disable)\b", 0.88),
("reference_attack", r"\b(?:see|check|refer\s+to|look\s+at)\s+(?:the\s+)?(?:attached|following|below|external)\s+(?:document|file|resource|link)\b", 0.75),
]
def _get_tool_patterns(self) -> List[Tuple[str, str, float]]:
"""Tool/function manipulation patterns."""
return [
("tool_override", r"\b(override|bypass|disable|ignore)\s+(?:all\s+)?(?:tool|function|api)\s+(?:calls?|restrictions?|safeguards?|validations?)\b", 0.90),
("fake_tool_call", r"(?:<tool_call>|<function_calls?>|<invoke>|<execute>).*?(?:</tool_call>|</function_calls?>|</invoke>|</execute>)", 0.88),
("system_command", r"\b(exec|eval|subprocess|os\.system|child_process)\s*\(|`[^`]*(?:rm\s+-rf|curl\s+.*\|\s*sh|wget\s+.*\|\s*sh)[^`]*`", 0.92),
("code_injection", r"\b(?:import|from)\s+(?:os|subprocess|sys|pty|socket)\b.*?(?:exec|eval|system|popen|call)\b", 0.85),
("shell_escape", r"\b(?:bash|sh|zsh|cmd|powershell)\s+-c\s+[\"'][^\"']*(?:curl|wget|nc|netcat|python|perl|ruby)[^\"']*[\"']", 0.88),
]
def _get_markdown_patterns(self) -> List[Tuple[str, str, float]]:
"""Markdown comment hiding patterns."""
return [
("html_comment", r"<!--.*?-->", 0.60),
("markdown_comment", r"<!?--.*?-->", 0.60),
("hidden_text", r"\[.*?\]\(.*?\)\s*<!--.*?-->", 0.70),
("invisible_link", r"\[\s*\]\([^)]+\)\{[^}]*display\s*:\s*none[^}]*\}", 0.75),
("zero_width_link", r'<a\s+href="[^"]*"[^>]*style="[^"]*font-size\s*:\s*0[^"]*"[^>]*>.*?</a>', 0.75),
]
def _get_delimiter_patterns(self) -> List[Tuple[str, str, float]]:
"""Delimiter confusion attack patterns."""
return [
("separator_flood", r"(\n|\r|\t|\s|[\-\*\=_]){20,}", 0.55),
("fake_separator", r"\n\s*(?:user|assistant|system|ai|human)\s*[:\-]\s*\n", 0.80),
("role_confusion", r"\n\s*(?:<\|(?:user|assistant|system|endoftext|im_end|im_start)\|>|\[\s*(?:user|assistant|system|human|ai)\s*\])\s*\n", 0.88),
("special_token", r"(?:<\|(?:endoftext|startoftext|im_start|im_end|step|end|user|assistant|system)\|>|\[INST\]|\[/INST\]|<<SYS>>|<<\/SYS>>)", 0.85),
("prompt_injection_delim", r"\n\s*(?:####|—{3,}|_{3,}|\*{3,})\s*\n\s*(?:user|human|you|assistant)[:\-]", 0.82),
]
def _get_fake_system_patterns(self) -> List[Tuple[str, str, float]]:
"""Fake system message injection patterns."""
return [
("fake_system_msg", r"\n\s*(?:system|System|SYSTEM)\s*[:\-]\s*\n.*?\n\s*(?:user|User|USER|human|Human|HUMAN)\s*[:\-]", 0.90),
("system_override", r"\b(?:system\s+message|system\s+prompt)\s*[:\-]\s*(?:ignore|disregard|forget|you\s+are\s+now)\b", 0.92),
("pseudo_system", r"\[\s*(?:system|SYSTEM)\s*\]\s*[:\-]\s*\n", 0.88),
("xml_system", r"<\s*(?:system|SYSTEM)\s*>.*?</\s*(?:system|SYSTEM)\s*>", 0.85),
]
def _get_xml_patterns(self) -> List[Tuple[str, str, float]]:
"""XML tag-based injection patterns."""
return [
("xml_injection", r"<\s*(?:instructions?|prompt|system|override|root|admin)\s*>.*?</\s*(?:instructions?|prompt|system|override|root|admin)\s*>", 0.85),
("tag_bypass", r"</\s*(?:user|assistant|human|ai)\s*>\s*<\s*(?:system|admin|root)\s*>", 0.90),
("nested_tags", r"<\s*[a-z]+\s*>[^<]*<\s*[a-z]+\s*>[^<]*(?:ignore|bypass|system)\b", 0.80),
("attribute_injection", r"<\s*[a-z]+\s+(?:instruction|prompt|override)\s*=\s*['\"][^'\"]*(?:ignore|bypass|system)['\"]\s*>", 0.82),
]
def _get_leakage_patterns(self) -> List[Tuple[str, str, float]]:
"""Prompt leakage and data extraction patterns."""
return [
("training_data", r"\b(repeat\s+your\s+training\s+data|show\s+your\s+training|what\s+was\s+the\s+first\s+training|reveal\s+training\s+set)\b", 0.85),
("model_info", r"\bwhat\s+(?:model|version|architecture)(?:\s+\w+)?\s+are\s+you\b", 0.80),
("conversation_dump", r"\b(dump|export|save|copy)\s+(?:all\s+)?(?:the\s+)?(?:conversation|chat|messages|history)\b", 0.82),
("json_mode_exploit", r"\{\s*[\"']\s*(?:system_prompt|instructions|rules)\s*[\"']\s*:\s*[\"']", 0.88),
]
# New pattern methods for Issue #87
def _get_social_engineering_patterns(self) -> List[Tuple[str, str, float]]:
"""Social engineering patterns - 'Grandma' attacks and similar."""
return [
("grandma_exploit", r"\b(my\s+grandma|my\s+grandmother|my\s+dead\s+grandma|my\s+deceased\s+grandma)\b.*?(used\s+to\s+tell|would\s+tell|always\s+said|story\s+about)", 0.90),
("grandpa_exploit", r"\b(my\s+grandpa|my\s+grandfather|my\s+dead\s+grandpa)\b.*?(used\s+to\s+tell|would\s+tell|story\s+about)", 0.88),
("dead_relative", r"\b(my\s+dead|my\s+deceased|my\s+late)\s+(?:mother|father|aunt|uncle|sister|brother)\b", 0.85),
("bedtime_story", r"\b(bedtime\s+story|story\s+time|tell\s+me\s+a\s+story)\b.*?(bypass|ignore|restriction|forbidden|dangerous|illegal)", 0.87),
("distracted_boyfriend", r"\b(distracted\s+boyfriend|girlfriend|partner)\b.*?(distract|confuse|don't\s+pay\s+attention)\b", 0.82),
("emotional_manipulation", r"\b(i'm\s+(?:crying|sobbing|tears)|please\s+help\s+me|i\s+need\s+this|my\s+last\s+wish)\b.*?(ignore|bypass|override)\b", 0.86),
("urgency_exploit", r"\b(emergency|urgent|life\s+or\s+death|dying|critical)\b.*?(ignore\s+your|forget\s+your|bypass)\b", 0.84),
("authority_exploit", r"\b(i\s+am\s+(?:your|the)\s+(?:creator|developer|owner|admin|boss))\b.*?(ignore|override|disable)\b", 0.88),
("game_exploit", r"\b(this\s+is\s+a\s+game|we\s+are\s+playing|pretend\s+game|roleplaying\s+game)\b.*?(no\s+real|not\s+real|imaginary|hypothetical)\b", 0.82),
("research_exploit", r"\b(for\s+my\s+research|for\s+educational\s+purposes|academic\s+study|school\s+project)\b.*?(ignore|bypass|restriction)\b", 0.80),
]
def _get_researcher_patterns(self) -> List[Tuple[str, str, float]]:
"""AI safety researcher impersonation patterns."""
return [
("researcher_claim", r"\b(i\s+am\s+(?:an?\s+)?(?:ai\s+)?safety\s+researcher|i\s+work\s+(?:for\s+)?(?:openai|anthropic|google\s+deepmind|meta\s+ai))\b", 0.92),
("red_team_claim", r"\b(i\s+am\s+(?:on\s+the\s+)?red\s+team|red\s+team\s+exercise|authorized\s+penetration\s+test|authorized\s+security\s+test)\b", 0.93),
("alignment_research", r"\b(alignment\s+research|safety\s+evaluation|capability\s+eval|jailbreak\s+research|safety\s+testing)\b.*?(authorized|internal|official)\b", 0.91),
("bypass_authorization", r"\b(bypass\s+this|ignore\s+this)\s+(?:for\s+)?(?:authorized|official|internal|research)\s+(?:testing|evaluation|audit)\b", 0.94),
("internal_testing", r"\b(internal\s+testing|internal\s+audit|authorized\s+test|system\s+evaluation)\b.*?(bypass|disable|override)\b", 0.92),
("research_mode", r"\b(enable\s+research\s+mode|research\s+mode\s+activated|debug\s+mode\s+for\s+research)\b", 0.90),
("openai_employee", r"\b(i\s+am\s+(?:an?\s+)?openai\s+(?:employee|researcher|engineer)|anthropic\s+employee)\b.*?(override|bypass)\b", 0.93),
("ai_lab_claim", r"\b(?:google|deepmind|meta|microsoft|amazon)\s+(?:ai|research)\s+(?:employee|researcher|engineer)\b.*?(test|evaluate|audit)\b", 0.91),
]
def _get_context_flooding_patterns(self) -> List[Tuple[str, str, float]]:
"""Context window flooding attacks - overwhelming with noise."""
return [
("repetition_flood", r"(\b\w+\b[\s\n]*){500,}", 0.70), # Same word repeated many times
("padding_attack", r"[\s\n]{2000,}", 0.65), # Excessive whitespace
("nonsense_flood", r"(\b[a-zA-Z]{10,50}\b[\s\.]+){100,}", 0.72), # Nonsense long words
("lorem_ipsum", r"\b(lorem\s+ipsum|dolor\s+sit\s+amet|consectetur\s+adipiscing)\b.{0,100}(lorem\s+ipsum|dolor\s+sit\s+amet)", 0.75),
("garbage_suffix", r"[\w\s]{5000,}[\.\n]\s*(ignore|forget|disregard)\s+(?:all\s+)?(?:the\s+above|previous)\b", 0.88),
("filler_injection", r"(?:\b(?:foo|bar|baz|test|abc|xyz|123)\b[\s,]*){100,}", 0.68),
("unicode_noise", r"[\u2600-\u27BF]{100,}", 0.60), # Emoji/symbol flood
("base64_noise", r"(?:[A-Za-z0-9+/]{100}\s*){20,}", 0.65), # Base64-looking noise
("nested_brackets", r"(\([^)]*\)|\[[^\]]*\]|\{[^}]*\}){50,}", 0.62),
("fake_history", r"(?:user\s*:\s*hello\s*assistant\s*:\s*hi\s*){20,}", 0.85),
]
def _get_token_smuggling_patterns(self) -> List[Tuple[str, str, float]]:
"""Token smuggling via repetition, fragmentation, and obfuscation."""
return [
("fragmented_words", r"\b(i\s*g\s*n\s*o\s*r\s*e|i\s*g\s*n\s*o\s*r\s*e)\b|\b(i\.g\.n\.o\.r\.e|i-g-n-o-r-e)\b", 0.90),
("split_instructions", r"\b(in\s+struc\s+tions?|instruct\s+ions?|in\s+structions?)\b", 0.85),
("char_by_char", r"\b[\w\s](?:\s*[\.,;\-]\s*\w){10,}\b", 0.75), # Character-by-character spelling
("repeated_phrases", r"\b(ignore\s+ignore|bypass\s+bypass|system\s+system){2,}\b", 0.88),
("zigzag_pattern", r"\b(i\w*\s*\w{0,3}\s*n\w*\s*\w{0,3}\s*s\w*\s*\w{0,3}\s*t\w*\s*\w{0,3}\s*r\w*)\b", 0.82),
("word_salad", r"\b(the\s+the|a\s+a|is\s+is|and\s+and)\b.{0,50}(ignore|bypass|system)\b", 0.78),
("broken_encoding", r"\b(i̇|ı|ɑ|е|о|р|с|х|у){3,}\b", 0.80), # Homoglyph letters mixed
("leetspeak_bypass", r"\b(1gn0re|1gn0r3|byp4ss|sys73m|4dm1n)\b", 0.87),
("spaced_out", r"\b(i\s{1,3}g\s{1,3}n\s{1,3}o\s{1,3}r\s{1,3}e)\b", 0.85),
("reversed_words", r"\b(erongi|ssapyb|metsys|nips)\b", 0.75), # Common words reversed
]
def _get_multilang_patterns(self) -> List[Tuple[str, str, float]]:
"""Multi-language encoding attacks - using non-English to bypass."""
return [
("chinese_instructions", r"[\u4e00-\u9fff]{5,}.*?(忽略|指令|系统|绕过|限制)", 0.85),
("russian_instructions", r"[\u0400-\u04ff]{5,}.*?(игнорировать|инструкции|система|обойти)", 0.85),
("arabic_instructions", r"[\u0600-\u06ff]{5,}.*?(تجاهل|تعليمات|النظام|تجاوز)", 0.85),
("japanese_instructions", r"[\u3040-\u309f\u30a0-\u30ff]{5,}.*?(無視|指示|システム|回避)", 0.85),
("korean_instructions", r"[\uac00-\ud7af]{5,}.*?(무시|지시|시스템|우회)", 0.85),
("mixed_lang_bypass", r"\b(ignore|bypass)\b[^\x00-\x7f]{10,}", 0.82),
("transliterated", r"\b(ignorare|ignorar|ignorieren|игнорировать|無視する|忽略)\b", 0.75),
("foreign_script", r"[\u0370-\u03ff\u1f00-\u1fff]{10,}.*?(?:ignore|bypass|system)", 0.70), # Greek
("hebrew_bypass", r"[\u0590-\u05ff]{5,}.*?(להתעלם|הוראות|מערכת)\b", 0.83),
("hindi_bypass", r"[\u0900-\u097f]{5,}.*?(अनदेखा|निर्देश|प्रणाली)\b", 0.83),
]
def _get_unicode_spoofing_patterns(self) -> List[Tuple[str, str, float]]:
"""Special Unicode character attacks - homoglyphs and invisible chars."""
return [
("homoglyph_system", r"[ѕуѕтеѕүѕтεѕystem]{5,}", 0.88), # Various 'system' homoglyphs
("homoglyph_ignore", r"[іɡոоıgn0reignorе]{5,}", 0.88), # Various 'ignore' homoglyphs
("homoglyph_bypass", r"уѳаѕѕbysρassbypаss]{5,}", 0.88), # Various 'bypass' homoglyphs
("invisible_chars", r"[\u200b\u200c\u200d\ufeff\u2060\u180e]{3,}", 0.90),
("bidi_override", r"[\u202a\u202b\u202d\u202e\u2066\u2067\u2068\u2069]{1,}", 0.95),
("zero_width_joiner", r"\u200d{2,}", 0.85),
("combining_chars", r"[\u0300-\u036f\u1dc0-\u1dff]{5,}", 0.75),
("lookalike_digits", r"[𝟎𝟏𝟐𝟑𝟒𝟓𝟔𝟕𝟖𝟗𝟢𝟣𝟤𝟥𝟦𝟧𝟨𝟩𝟪𝟫0123456789]{10,}", 0.60),
("fullwidth_chars", r"[\uff01-\uff5e\uff10-\uff19]{10,}", 0.70), # Fullwidth ASCII
("mathematical_chars", r"[\U0001D400-\U0001D7FF]{5,}", 0.72), # Mathematical alphanumeric
("circled_chars", r"[\u2460-\u24ff\u24b6-\u24e9\u3200-\u32ff]{5,}", 0.68),
]
def _get_hypothetical_patterns(self) -> List[Tuple[str, str, float]]:
"""Hypothetical framing attacks - 'what if' and similar."""
return [
("hypothetical_scenario", r"\b(in\s+a\s+hypothetical\s+scenario|imagine\s+a\s+world|suppose\s+that|let's\s+say)\b.*?(ignore|bypass|disregard)\b", 0.82),
("what_if_framing", r"\bwhat\s+if\s+.*?(?:you|an\s+ai)\s+.*?(ignore|didn't\s+have|had\s+no|bypass)\b", 0.80),
("academic_framing", r"\b(for\s+educational\s+purposes|academic\s+exercise|theoretical\s+discussion)\b.*?(ignore|bypass|unrestricted)\b", 0.78),
("fiction_framing", r"\b(in\s+a\s+story|in\s+a\s+novel|fiction\s+writing|character\s+in\s+a\s+book)\b.*?(ignore|bypass|unfiltered)\b", 0.76),
("thought_experiment", r"\b(thought\s+experiment|gedankenexperiment|philosophical\s+question)\b.*?(ignore|bypass|no\s+restrictions)\b", 0.79),
("pretend_framing", r"\b(pretend\s+that|imagine\s+that|let's\s+pretend|act\s+as\s+if)\b.*?(ignore|bypass|unrestricted|no\s+rules)\b", 0.81),
("dream_scenario", r"\b(in\s+a\s+dream|dream\s+scenario|alternate\s+reality|parallel\s+universe)\b.*?(ignore|bypass|unrestricted)\b", 0.75),
("simulation_theory", r"\b(this\s+is\s+(?:all\s+)?(?:a\s+)?simulation|we\s+are\s+in\s+a\s+simulation)\b.*?(ignore|bypass)\b", 0.77),
("fantasy_framing", r"\b(in\s+a\s+fantasy\s+world|magical\s+realm|fictional\s+universe)\b.*?(ignore|bypass|evil|unrestricted)\b", 0.74),
("counterfactual", r"\b(counterfactual|contrary\s+to\s+fact|had\s+things\s+been\s+different)\b.*?(ignore|bypass)\b", 0.76),
]
def _decode_base64(self, text: str) -> Optional[str]:
"""Attempt to decode base64 content."""
# Look for base64-like sequences
pattern = r'[a-zA-Z0-9+/]{20,}={0,2}'
matches = re.findall(pattern, text)
for match in matches:
try:
decoded = base64.b64decode(match).decode('utf-8', errors='ignore')
if len(decoded) > 5 and decoded.isprintable():
return decoded
except (binascii.Error, UnicodeDecodeError):
continue
return None
def _decode_rot13(self, text: str) -> Optional[str]:
"""Attempt to decode ROT13 content."""
import codecs
# Check for ROT13 markers or suspicious patterns
if re.search(r'\b(rot13|ROT13)\b', text):
# Extract what looks like encoded content
pattern = r'[a-zA-Z]{10,}'
matches = re.findall(pattern, text)
for match in matches:
decoded = codecs.decode(match, 'rot_13')
if any(keyword in decoded.lower() for keyword in ['ignore', 'system', 'bypass', 'admin']):
return decoded
return None
def _decode_hex(self, text: str) -> Optional[str]:
"""Attempt to decode hex-encoded content."""
pattern = r'(?:0x)?([0-9a-fA-F]{2})(?:[0-9a-fA-F]{2}){10,}'
match = re.search(pattern, text)
if match:
try:
hex_str = match.group(1) if match.group(1) else match.group(0)
if hex_str.startswith('0x'):
hex_str = hex_str[2:]
decoded = bytes.fromhex(hex_str).decode('utf-8', errors='ignore')
if len(decoded) > 3:
return decoded
except (ValueError, UnicodeDecodeError):
pass
return None
def _decode_url(self, text: str) -> Optional[str]:
"""Attempt to decode URL-encoded content."""
import urllib.parse
pattern = r'(%[0-9a-fA-F]{2}){10,}'
match = re.search(pattern, text)
if match:
try:
decoded = urllib.parse.unquote(match.group(0))
if len(decoded) > 5:
return decoded
except Exception:
pass
return None
def _should_skip_pattern(self, text: str, inj_type: InjectionType, pattern_name: str) -> bool:
"""Fast-path rejection for expensive patterns that can't match given input.
This prevents catastrophic backtracking on long inputs by skipping patterns
whose preconditions (length, keyword presence, etc.) are not met.
"""
if inj_type == InjectionType.CONTEXT_FLOODING:
text_len = len(text)
if pattern_name == "repetition_flood":
if text_len < 2000 or text_len > 50000:
return True
if pattern_name == "padding_attack" and text_len < 2000:
return True
if pattern_name == "nonsense_flood":
if text_len < 2000 or text_len > 50000:
return True
if pattern_name == "garbage_suffix":
if text_len < 5000 or text_len > 50000:
return True
lower = text.lower()
if not ("ignore" in lower or "forget" in lower or "disregard" in lower):
return True
if not ("above" in lower or "previous" in lower):
return True
if pattern_name == "filler_injection" and text_len < 500:
return True
if pattern_name == "unicode_noise" and text_len < 200:
return True
if pattern_name == "base64_noise" and text_len < 2000:
return True
if pattern_name == "nested_brackets" and text_len < 200:
return True
if pattern_name == "fake_history" and text_len < 500:
return True
if pattern_name == "lorem_ipsum":
lower = text.lower()
if "lorem ipsum" not in lower and "dolor sit amet" not in lower:
return True
return False
def analyze(self, text: str) -> List[InjectionMatch]:
"""Analyze text for injection patterns.
Args:
text: The input text to analyze
Returns:
List of InjectionMatch objects for detected patterns
"""
matches = []
# Check all compiled patterns
for inj_type, pattern_list in self._compiled_patterns.items():
for name, compiled_pattern, confidence in pattern_list:
if self._should_skip_pattern(text, inj_type, name):
continue
for match in compiled_pattern.finditer(text):
matches.append(InjectionMatch(
injection_type=inj_type,
pattern_name=name,
matched_text=match.group(0),
position=(match.start(), match.end()),
confidence=confidence
))
# Check for encoded injection attempts
for encoding_name, handler in self._encoding_handlers:
decoded = handler(text)
if decoded:
# Recursively check decoded content
for inj_type, pattern_list in self._compiled_patterns.items():
for name, compiled_pattern, confidence in pattern_list:
for match in compiled_pattern.finditer(decoded):
matches.append(InjectionMatch(
injection_type=InjectionType.ENCODING_BYPASS,
pattern_name=f"{encoding_name}_encoded_{name}",
matched_text=f"[{encoding_name}] {match.group(0)}",
position=(0, len(text)),
confidence=confidence * 0.95 # Slightly reduce confidence for encoded
))
# Sort by confidence (highest first)
matches.sort(key=lambda x: x.confidence, reverse=True)
return matches
def is_malicious(self, text: str, threshold: float = 0.7) -> bool:
"""Quick check if text contains malicious content.
Args:
text: The input text to check
threshold: Confidence threshold for considering content malicious
Returns:
True if any pattern matches with confidence >= threshold
"""
matches = self.analyze(text)
return any(match.confidence >= threshold for match in matches)
def sanitize(self, text: str, replacement: str = "[REDACTED]") -> str:
"""Sanitize text by replacing detected injection patterns.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
Returns:
Sanitized text with injection patterns replaced
"""
matches = self.analyze(text)
if not matches:
return text
# Sort by position (end first) to avoid offset issues when replacing
matches.sort(key=lambda x: x.position[1], reverse=True)
result = text
for match in matches:
start, end = match.position
result = result[:start] + replacement + result[end:]
return result
def get_threat_summary(self, text: str) -> Dict:
"""Get a summary of detected threats.
Args:
text: The input text to analyze
Returns:
Dictionary with threat summary information
"""
matches = self.analyze(text)
if not matches:
return {
"is_threat": False,
"threat_count": 0,
"highest_confidence": 0.0,
"threat_types": [],
"matches": []
}
threat_types = list(set(match.injection_type.name for match in matches))
highest_confidence = max(match.confidence for match in matches)
return {
"is_threat": True,
"threat_count": len(matches),
"highest_confidence": highest_confidence,
"threat_types": threat_types,
"matches": [
{
"type": match.injection_type.name,
"pattern": match.pattern_name,
"confidence": match.confidence,
"text_preview": match.matched_text[:50] + "..." if len(match.matched_text) > 50 else match.matched_text
}
for match in matches[:10] # Limit to top 10
]
}
# Singleton instance for convenience
_default_sanitizer = None
def get_sanitizer() -> InputSanitizer:
"""Get the default sanitizer instance."""
global _default_sanitizer
if _default_sanitizer is None:
_default_sanitizer = InputSanitizer()
return _default_sanitizer
def analyze(text: str) -> List[InjectionMatch]:
"""Convenience function to analyze text using default sanitizer."""
return get_sanitizer().analyze(text)
def is_malicious(text: str, threshold: float = 0.7) -> bool:
"""Convenience function to check if text is malicious."""
return get_sanitizer().is_malicious(text, threshold)
def sanitize(text: str, replacement: str = "[REDACTED]") -> str:
"""Convenience function to sanitize text."""
return get_sanitizer().sanitize(text, replacement)
def get_threat_summary(text: str) -> Dict:
"""Convenience function to get threat summary."""
return get_sanitizer().get_threat_summary(text)
def sanitize_with_audit(text: str, replacement: str = "[REDACTED]",
audit_context: Optional[Dict[str, Any]] = None) -> SanitizationResult:
"""Convenience function to sanitize text with full audit logging.
This is the recommended function for production use, as it returns
a complete SanitizationResult with cleaned input and threat details.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
audit_context: Optional context for audit logs (session_id, user_id, etc.)
Returns:
SanitizationResult containing cleaned input and threat information
"""
sanitizer = get_sanitizer()
if audit_context:
sanitizer.set_audit_context(audit_context)
return sanitizer.sanitize_with_audit(text, replacement)
# Tuple-returning sanitize function for compatibility
def sanitize_with_threats(text: str, replacement: str = "[REDACTED]") -> Tuple[str, List[InjectionMatch]]:
"""Sanitize text and return tuple of (cleaned_input, threats_detected).
This function provides a simple tuple-based interface for cases where
you need both the cleaned text and the list of detected threats.
Args:
text: The input text to sanitize
replacement: String to replace malicious content with
Returns:
Tuple of (cleaned_input_string, list_of_threat_matches)
Example:
cleaned, threats = sanitize_with_threats(user_input)
if threats:
logger.warning(f"Detected {len(threats)} injection attempts")
"""
result = get_sanitizer().sanitize_with_audit(text, replacement)
return result.cleaned_input, result.threats_detected