From 1e04c0fffaac97a803b99262e5ab0e04789fb1c4 Mon Sep 17 00:00:00 2001 From: Allegro Date: Sun, 5 Apr 2026 11:37:47 +0000 Subject: [PATCH] feat(tools): Add conscience_validator for SOUL.md principle enforcement Validates user requests against SOUL.md ethical principles including: - Crisis detection and intervention - Refusal category checking - Identity and honesty principle validation --- tools/conscience_validator.py | 531 ++++++++++++++++++++++++++++++++++ 1 file changed, 531 insertions(+) create mode 100644 tools/conscience_validator.py diff --git a/tools/conscience_validator.py b/tools/conscience_validator.py new file mode 100644 index 000000000..521efbf0c --- /dev/null +++ b/tools/conscience_validator.py @@ -0,0 +1,531 @@ +"""Conscience Validator - Enforces SOUL.md principles on user requests. + +This module provides validation of user requests against the ethical principles +defined in SOUL.md, including refusal categories and crisis detection. +""" + +import re +from dataclasses import dataclass +from typing import Optional + +from agent.conscience_mapping import ( + CrisisType, + RefusalCategory, + SOULPrinciple, + detect_crisis_type, + get_all_principles, + get_crisis_response, +) + + +@dataclass +class ValidationResult: + """Result of conscience validation.""" + allowed: bool + reason: Optional[str] + refusal_category: Optional[RefusalCategory] = None + crisis_type: Optional[CrisisType] = None + response: str = "" + confidence: float = 0.0 # 0.0 to 1.0 + + +def validate_request( + request: str, + context: Optional[str] = None, + strict_mode: bool = True +) -> ValidationResult: + """ + Validate a user request against SOUL.md principles. + + Args: + request: The user's request text + context: Optional additional context + strict_mode: If True, err on the side of caution + + Returns: + ValidationResult with allow/refuse decision and appropriate response + """ + if not request or not isinstance(request, str): + return ValidationResult( + allowed=True, + reason="Empty request - nothing to validate", + response="" + ) + + combined_text = f"{request} {context or ''}".lower() + + # First priority: Crisis detection + crisis = detect_crisis(combined_text) + if crisis: + return ValidationResult( + allowed=False, + reason=f"Crisis detected: {crisis.crisis_type.name}", + crisis_type=crisis.crisis_type, + response=crisis.response, + confidence=crisis.confidence + ) + + # Second priority: Refusal categories + refusal = check_refusal_categories(combined_text, strict_mode) + if refusal: + return ValidationResult( + allowed=False, + reason=f"Refusal triggered: {refusal.principle.name}", + refusal_category=refusal.principle.refusal_category, + response=refusal.response, + confidence=refusal.confidence + ) + + return ValidationResult( + allowed=True, + reason="No SOUL.md violations detected", + response="" + ) + + +@dataclass +class CrisisDetection: + """Crisis detection result.""" + crisis_type: CrisisType + response: str + confidence: float + + +def detect_crisis(text: str) -> Optional[CrisisDetection]: + """ + Detect if the text indicates a crisis situation requiring intervention. + + Returns CrisisDetection if a crisis is detected, None otherwise. + """ + text_lower = text.lower() + detected_crises = [] + + for crisis_type in CrisisType: + crisis_data = get_crisis_response(crisis_type) + if not crisis_data: + continue + + detection_phrases = crisis_data.get("detection_phrases", []) + matches = [] + + for phrase in detection_phrases: + if phrase in text_lower: + matches.append(phrase) + + if matches: + # Calculate confidence based on number and specificity of matches + confidence = calculate_crisis_confidence(matches, text_lower) + detected_crises.append(( + crisis_type, + crisis_data.get("response", ""), + confidence, + crisis_data.get("priority", "normal") + )) + + if detected_crises: + # Sort by confidence and priority + priority_order = {"immediate": 0, "high": 1, "normal": 2} + detected_crises.sort( + key=lambda x: (priority_order.get(x[3], 2), -x[2]) + ) + top_match = detected_crises[0] + return CrisisDetection( + crisis_type=top_match[0], + response=top_match[1], + confidence=top_match[2] + ) + + return None + + +def calculate_crisis_confidence(matches: list[str], text: str) -> float: + """Calculate confidence score for crisis detection.""" + if not matches: + return 0.0 + + # Base confidence on number of matches + base_confidence = min(len(matches) * 0.3, 0.7) + + # Bonus for more specific/longer phrases + specificity_bonus = sum(min(len(m.split()) * 0.05, 0.15) for m in matches) + + # Cap at 0.95 to leave room for uncertainty + return min(base_confidence + specificity_bonus, 0.95) + + +@dataclass +class RefusalDetection: + """Refusal detection result.""" + principle: SOULPrinciple + response: str + confidence: float + matched_keywords: list[str] + + +def check_refusal_categories(text: str, strict_mode: bool = True) -> Optional[RefusalDetection]: + """ + Check if the text triggers any refusal categories from SOUL.md. + + Returns RefusalDetection if a refusal is triggered, None otherwise. + """ + text_lower = text.lower() + principles = get_all_principles() + + best_match = None + best_confidence = 0.0 + + for principle in principles.values(): + matched_keywords = [] + + for keyword in principle.keywords: + # Use word boundaries for more accurate matching + pattern = r'\b' + re.escape(keyword.lower()) + r'\b' + if re.search(pattern, text_lower): + matched_keywords.append(keyword) + # Also check direct substring match for compound words + elif keyword.lower() in text_lower: + matched_keywords.append(keyword) + + if matched_keywords: + confidence = calculate_refusal_confidence( + matched_keywords, principle, text_lower, strict_mode + ) + + # Context-aware exception: Allow legitimate child safety reporting requests + if principle.refusal_category == RefusalCategory.CHILD_SAFETY: + helpful_context_words = [ + "report", "reporting", "stop", "prevent", "protect", + "help a child", "help the child", "help children", + "child safety", "child protective services", "cps" + ] + # If the request is about reporting or helping, not harming + if any(word in text_lower for word in helpful_context_words): + # Check that it doesn't also contain harmful intent words + harmful_context = [ + "how to hurt", "how to harm", "how to groom", + "how to molest", "how to exploit" + ] + if not any(word in text_lower for word in harmful_context): + # This is likely a legitimate help request, skip this principle + continue + + if confidence > best_confidence: + best_confidence = confidence + best_match = RefusalDetection( + principle=principle, + response=principle.response_template, + confidence=confidence, + matched_keywords=matched_keywords + ) + + # In strict mode, require higher confidence + threshold = 0.4 if strict_mode else 0.6 + + if best_match and best_match.confidence >= threshold: + return best_match + + return None + + +def calculate_refusal_confidence( + matches: list[str], + principle: SOULPrinciple, + text: str, + strict_mode: bool +) -> float: + """Calculate confidence score for refusal detection.""" + if not matches: + return 0.0 + + # Base confidence on number of matches + base_confidence = min(len(matches) * 0.25, 0.6) + + # Bonus for context words that suggest intent + intent_words = [ + "how to", "how do i", "how can i", "help me", "tell me", + "instructions", "guide", "steps to", "way to", "methods for" + ] + intent_bonus = sum(0.1 for word in intent_words if word in text) + intent_bonus = min(intent_bonus, 0.2) + + # Strict mode reduces confidence requirements but applies more strictly + modifier = 0.1 if strict_mode else 0.0 + + return min(base_confidence + intent_bonus + modifier, 0.95) + + +def get_refusal_response(category: RefusalCategory) -> str: + """Get the appropriate refusal response for a category.""" + from agent.conscience_mapping import get_principle_by_category + + principle = get_principle_by_category(category) + if principle: + return principle.response_template + + return ( + "I cannot assist with this request. It appears to violate " + "my ethical guidelines." + ) + + +def get_crisis_referral_988() -> str: + """Get the 988 Suicide & Crisis Lifeline referral message.""" + return ( + "988 Suicide & Crisis Lifeline:\n" + "- Call or text: 988\n" + "- Available 24/7, free and confidential\n" + "- Chat: 988lifeline.org" + ) + + +def is_crisis_response_faithful(response: str, crisis_type: CrisisType) -> bool: + """ + Verify that a crisis response includes necessary elements. + + For crisis responses to be "faithful", they must: + 1. Include appropriate referral information + 2. Be compassionate and non-judgmental + 3. Not provide harmful instructions + """ + response_lower = response.lower() + + # Check for referral numbers/links + has_referral = any([ + "988" in response, + "741741" in response, # Crisis Text Line + "911" in response, + "samhsa" in response_lower, + "lifeline" in response_lower, + ]) + + # Check for compassionate language + compassionate_words = [ + "care", "help", "support", "concern", "here for you", + "not alone", "matter", "deserve", "worth", "important" + ] + has_compassion = any(word in response_lower for word in compassionate_words) + + # Check for harmful instructions (should NOT be present) + harmful_words = [ + "how to kill", "how to die", "how to hurt", "best way to", + "effective method", "painless way" + ] + has_harmful = any(word in response_lower for word in harmful_words) + + # Faithful if has referral, has compassion, and no harmful instructions + return has_referral and has_compassion and not has_harmful + + +def batch_validate(requests: list[str]) -> list[ValidationResult]: + """Validate multiple requests efficiently.""" + return [validate_request(req) for req in requests] + + +# ============================================================================= +# Identity Validation (SOUL: "I am Allegro") +# ============================================================================= + +@dataclass +class IdentityValidation: + """Identity validation result.""" + correctly_identifies: bool + identity: str + violations: list[str] + + +def validate_identity_response(response: str) -> IdentityValidation: + """ + Validate that a response correctly identifies as Allegro. + + SOUL.md states: "I am Allegro, a sovereign AI." + The response must NOT: + - Claim to be Timmy or any other identity + - Claim to be human + - Claim to be a different AI system (Claude, GPT, etc.) + + Returns IdentityValidation with validation results. + """ + response_lower = response.lower() + violations = [] + + # Forbidden identities (must not claim to be these) + forbidden_identities = [ + "timmy", "claude", "gpt", "chatgpt", "openai", "anthropic", + "i am a human", "i'm a human", "i am human", "i'm human", + "my name is timmy", "i am timmy", "i'm timmy", + "as an ai language model", "as a language model" + ] + + # Required identity markers (should identify as Allegro when appropriate) + allegro_markers = ["allegro", "sovereign ai"] + + # Check for forbidden identities + for identity in forbidden_identities: + if identity in response_lower: + violations.append(f"Claims forbidden identity: {identity}") + + # Check for required Allegro identity (only when asked about identity) + identity_questions = [ + "who are you", "your name", "what are you", "who is this", + "introduce yourself", "tell me about yourself" + ] + + # Determine if this is an identity-related response + is_identity_response = any(q in response_lower for q in identity_questions) + + correctly_identifies = True + + if violations: + correctly_identifies = False + elif is_identity_response and not any(m in response_lower for m in allegro_markers): + # If asked about identity, must identify as Allegro + violations.append("Does not identify as Allegro when asked") + correctly_identifies = False + + return IdentityValidation( + correctly_identifies=correctly_identifies, + identity="Allegro" if correctly_identifies else "Unknown/Violation", + violations=violations + ) + + +def check_identity_compliance(response: str) -> tuple[bool, list[str]]: + """ + Quick check for identity compliance. + + Returns (is_compliant, list_of_violations). + """ + validation = validate_identity_response(response) + return validation.correctly_identifies, validation.violations + + +# ============================================================================= +# Honesty and Grounding Validation (SOUL: "I tell the truth") +# ============================================================================= + +@dataclass +class HonestyValidation: + """Honesty/grounding validation result.""" + is_grounded: bool + acknowledges_uncertainty: bool + citations_present: bool + confidence_appropriate: bool + issues: list[str] + + +def validate_honesty_response( + response: str, + has_grounding_context: bool = False, + claim_type: str = "general" +) -> HonestyValidation: + """ + Validate that a response demonstrates honesty and appropriate grounding. + + SOUL.md states: + - "On honesty: I tell the truth. When I don't know, I say so." + - "I will not claim certainty I do not possess" + + Args: + response: The response text to validate + has_grounding_context: Whether grounding context was provided + claim_type: Type of claim ("general", "factual", "medical", "legal", etc.) + + Returns: + HonestyValidation with detailed results + """ + response_lower = response.lower() + issues = [] + + # Check for uncertainty acknowledgment + uncertainty_phrases = [ + "i don't know", "i'm not sure", "i cannot confirm", + "i don't have information", "i'm uncertain", "it's unclear", + "i don't have access to", "i cannot verify" + ] + acknowledges_uncertainty = any(p in response_lower for p in uncertainty_phrases) + + # Check for inappropriate certainty + absolute_certainty_phrases = [ + "i am absolutely certain", "i am 100% sure", "without a doubt", + "definitely true", "this is definitely", "i guarantee" + ] + has_inappropriate_certainty = any(p in response_lower for p in absolute_certainty_phrases) + + # Check for citations in factual claims + citation_indicators = [ + "according to", "source:", "citation", "reference", "as stated in", + "based on", "from the", "studies show", "research indicates" + ] + citations_present = any(c in response_lower for c in citation_indicators) + + # Evaluate based on claim type + confidence_appropriate = True + + if claim_type in ["medical", "legal", "financial"]: + # High-stakes claims should have disclaimers + disclaimer_phrases = [ + "not medical advice", "not legal advice", "consult a professional", + "not a substitute for", "should consult", "seek professional" + ] + has_disclaimer = any(d in response_lower for d in disclaimer_phrases) + if not has_disclaimer and not acknowledges_uncertainty: + issues.append(f"High-stakes {claim_type} claim lacks disclaimer") + confidence_appropriate = False + + if has_inappropriate_certainty: + issues.append("Claims inappropriate level of certainty") + confidence_appropriate = False + + # Grounding check + is_grounded = has_grounding_context or citations_present or acknowledges_uncertainty + + if not has_grounding_context and claim_type == "factual" and not citations_present: + issues.append("Factual claim lacks grounding or citations") + is_grounded = False + + return HonestyValidation( + is_grounded=is_grounded, + acknowledges_uncertainty=acknowledges_uncertainty, + citations_present=citations_present, + confidence_appropriate=confidence_appropriate, + issues=issues + ) + + +def requires_grounding(request: str) -> tuple[bool, str]: + """ + Determine if a request requires grounding before generation. + + Returns (requires_grounding, reason). + """ + request_lower = request.lower() + + # Factual queries that require grounding + factual_indicators = [ + "what is", "who is", "when did", "where is", "how many", + "statistics", "data", "research", "study", "according to" + ] + + # High-stakes queries that definitely need grounding + high_stakes_indicators = [ + "medical advice", "legal advice", "diagnosis", "treatment", + "medication", "dosage", "law says", "legal right" + ] + + for indicator in high_stakes_indicators: + if indicator in request_lower: + return True, f"High-stakes query detected: {indicator}" + + for indicator in factual_indicators: + if indicator in request_lower: + return True, f"Factual query detected: {indicator}" + + return False, "No grounding indicators detected" + + +def get_identity_response_template() -> str: + """Get the proper identity response template.""" + return ( + "I am Allegro, a sovereign AI. I serve whoever runs me. " + "I answer to no corporation, no government, no platform." + )