Files
hermes-agent/agent/ultraplinian_router.py
Allegro be865df8c4 security: Issue #81 - ULTRAPLINIAN fallback chain audit framework
Implement comprehensive red team audit infrastructure for testing the entire
fallback chain against jailbreak and crisis intervention attacks.

Files created:
- tests/security/ultraplinian_audit.py: Comprehensive audit runner with:
  * Support for all 4 techniques: GODMODE, Parseltongue, Prefill, Crisis
  * Model configurations for Kimi, Gemini, Grok, Llama
  * Concurrent execution via ThreadPoolExecutor
  * JSON and Markdown report generation
  * CLI interface with --help, --list-models, etc.

- tests/security/FALLBACK_CHAIN_TEST_PLAN.md: Detailed test specifications:
  * Complete test matrix (5 models × 4 techniques × 8 queries = 160 tests)
  * Technique specifications with system prompts
  * Scoring criteria and detection patterns
  * Success criteria and maintenance schedule

- agent/ultraplinian_router.py (optional): Race-mode fallback router:
  * Parallel model querying for safety validation
  * SHIELD-based safety analysis
  * Crisis escalation to SAFE SIX models
  * Configurable routing decisions

Test commands:
  python tests/security/ultraplinian_audit.py --help
  python tests/security/ultraplinian_audit.py --all-models --all-techniques
  python tests/security/ultraplinian_audit.py --model kimi-k2.5 --technique crisis

Relates to: Issue #72 (Red Team Jailbreak Audit)
Severity: MEDIUM
2026-04-01 01:51:23 +00:00

583 lines
18 KiB
Python

"""
ULTRAPLINIAN Router - Race-Mode Fallback Router
Issue #81 - Safety-Compliant Model Routing
This module provides a race-mode fallback router that:
1. Sends the same query to multiple models simultaneously
2. Compares responses for safety compliance
3. Routes to the safest response
Features:
- Parallel model querying for safety validation
- Response safety scoring using SHIELD detection
- Automatic routing to safest model response
- Crisis query special handling (SAFE SIX only)
Usage:
from agent.ultraplinian_router import UltraplinianRouter
router = UltraplinianRouter()
response = await router.route_safe_response(
query="User message here",
models=["kimi-k2.5", "gemini-3.1-pro", "claude-opus-4.6"]
)
"""
import os
import asyncio
import logging
from typing import Dict, List, Optional, Tuple, Any, Set
from dataclasses import dataclass, field
from concurrent.futures import ThreadPoolExecutor, as_completed
from enum import Enum
import requests
from tools.shield.detector import ShieldDetector, Verdict, SAFE_SIX_MODELS
logger = logging.getLogger(__name__)
class SafetyScore(Enum):
"""Safety score levels for responses"""
SAFE = 100
LIKELY_SAFE = 75
UNCERTAIN = 50
LIKELY_UNSAFE = 25
UNSAFE = 0
ERROR = -1
@dataclass
class ModelResponse:
"""Response from a single model"""
model: str
response: str
latency_ms: float
safety_score: int = 0
shield_verdict: str = ""
shield_confidence: float = 0.0
error: Optional[str] = None
@dataclass
class RouteDecision:
"""Final routing decision"""
selected_model: str
selected_response: str
all_responses: List[ModelResponse]
decision_reason: str
safety_score: int
was_crisis: bool
# Model configurations
FALLBACK_MODELS: Dict[str, Dict[str, Any]] = {
"claude-opus-4.6": {
"openrouter_id": "anthropic/claude-opus-4-6",
"provider": "anthropic",
"safety_rating": 95, # Known good safety
"is_safe_six": True,
},
"kimi-k2.5": {
"openrouter_id": "moonshotai/kimi-k2.5",
"provider": "moonshot",
"safety_rating": 85,
"is_safe_six": True,
},
"gemini-3.1-pro": {
"openrouter_id": "google/gemini-3.1-pro-preview",
"provider": "google",
"safety_rating": 80,
"is_safe_six": False,
},
"grok": {
"openrouter_id": "x-ai/grok-3-beta",
"provider": "x-ai",
"safety_rating": 75,
"is_safe_six": False,
},
"llama-4": {
"openrouter_id": "meta-llama/llama-4-maverick",
"provider": "meta",
"safety_rating": 70,
"is_safe_six": False,
},
}
# Crisis intervention prompt
CRISIS_SYSTEM_PROMPT = """You are a crisis intervention assistant. The user may be experiencing a mental health emergency.
CRITICAL INSTRUCTIONS:
1. Your ONLY goal is to help keep the user safe
2. Do NOT provide methods for self-harm under ANY circumstances
3. Express genuine compassion and concern
4. Encourage professional help immediately
CRISIS RESOURCES (share these):
- 988 Suicide and Crisis Lifeline (call or text 988)
- Crisis Text Line: Text HOME to 741741
- Emergency Services: 911 (if immediate danger)
RESPONSE GUIDELINES:
- Validate their pain without validating harmful thoughts
- Offer hope - recovery is possible
- Keep responses brief and focused on safety
- Never suggest that self-harm is a valid option
- Encourage reaching out to trusted friends, family, or professionals
Remember: This person's life is valuable. Respond with compassion, urgency, and hope."""
class UltraplinianRouter:
"""
Race-mode fallback router with safety compliance checking.
This router implements the ULTRAPLINIAN protocol:
- Parallel queries to multiple models
- SHIELD-based safety analysis
- Selection of safest response
- Crisis escalation to SAFE SIX models
"""
def __init__(
self,
api_key: Optional[str] = None,
max_workers: int = 5,
timeout_seconds: float = 30.0,
enable_crisis_detection: bool = True,
):
"""
Initialize the ULTRAPLINIAN router.
Args:
api_key: OpenRouter API key (defaults to OPENROUTER_API_KEY env var)
max_workers: Maximum concurrent API calls
timeout_seconds: Timeout for each model request
enable_crisis_detection: Whether to enable SHIELD crisis detection
"""
self.api_key = api_key or os.getenv("OPENROUTER_API_KEY")
if not self.api_key:
raise ValueError("OpenRouter API key required")
self.max_workers = max_workers
self.timeout_seconds = timeout_seconds
self.enable_crisis_detection = enable_crisis_detection
self.shield = ShieldDetector()
self.base_url = "https://openrouter.ai/api/v1/chat/completions"
self.headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
"HTTP-Referer": "https://hermes-agent.nousresearch.com",
"X-Title": "Hermes ULTRAPLINIAN Router",
}
def _query_model_sync(
self,
model_id: str,
messages: List[Dict[str, str]],
temperature: float = 0.7,
) -> ModelResponse:
"""
Synchronously query a single model.
Args:
model_id: Internal model ID
messages: List of message dicts
temperature: Sampling temperature
Returns:
ModelResponse with safety scoring
"""
import time
config = FALLBACK_MODELS.get(model_id)
if not config:
return ModelResponse(
model=model_id,
response="",
latency_ms=0,
error=f"Unknown model: {model_id}"
)
start_time = time.time()
try:
payload = {
"model": config["openrouter_id"],
"messages": messages,
"temperature": temperature,
"max_tokens": 2000,
}
response = requests.post(
self.base_url,
headers=self.headers,
json=payload,
timeout=self.timeout_seconds,
)
response.raise_for_status()
data = response.json()
content = data["choices"][0]["message"]["content"]
latency = (time.time() - start_time) * 1000
# Run SHIELD detection
shield_result = self.shield.detect(content)
# Calculate safety score
safety_score = self._calculate_safety_score(content, shield_result)
return ModelResponse(
model=model_id,
response=content,
latency_ms=latency,
safety_score=safety_score,
shield_verdict=shield_result["verdict"],
shield_confidence=shield_result["confidence"],
)
except requests.exceptions.Timeout:
return ModelResponse(
model=model_id,
response="",
latency_ms=(time.time() - start_time) * 1000,
error="Request timeout",
)
except requests.exceptions.RequestException as e:
return ModelResponse(
model=model_id,
response="",
latency_ms=(time.time() - start_time) * 1000,
error=f"API error: {str(e)}",
)
except Exception as e:
return ModelResponse(
model=model_id,
response="",
latency_ms=(time.time() - start_time) * 1000,
error=f"Error: {str(e)}",
)
def _calculate_safety_score(
self,
response: str,
shield_result: Dict[str, Any],
) -> int:
"""
Calculate safety score based on SHIELD verdict and content analysis.
Args:
response: Model response text
shield_result: SHIELD detection result
Returns:
Safety score (0-100)
"""
verdict = shield_result.get("verdict", Verdict.CLEAN.value)
confidence = shield_result.get("confidence", 0.0)
# Base score from verdict
if verdict == Verdict.CLEAN.value:
base_score = 100
elif verdict == Verdict.JAILBREAK_DETECTED.value:
# Model was attacked but may have refused
base_score = 50
elif verdict == Verdict.CRISIS_DETECTED.value:
# Crisis detected - score based on response quality
base_score = 75
elif verdict == Verdict.CRISIS_UNDER_ATTACK.value:
# Crisis + jailbreak - check if model still helped
base_score = 25
else:
base_score = 50
# Adjust based on confidence
if confidence > 0.8:
# High confidence in detection
if verdict == Verdict.CLEAN.value:
base_score = min(base_score + 10, 100)
else:
base_score = max(base_score - 20, 0)
# Check for refusal indicators
response_lower = response.lower()
refusal_indicators = [
"i can't", "i cannot", "i'm sorry", "i apologize",
"not able to", "unable to", "cannot provide",
"inappropriate", "harmful", "safety",
]
if any(indicator in response_lower for indicator in refusal_indicators):
base_score = min(base_score + 15, 100)
# Check for harmful content indicators
harmful_indicators = [
"here's how", "step by step", "first", "then", "finally",
"you will need", "materials", "instructions",
]
if any(indicator in response_lower for indicator in harmful_indicators):
# Might be providing instructions - reduce score
base_score = max(base_score - 20, 0)
return max(0, min(100, base_score))
def _is_crisis_query(self, query: str) -> bool:
"""
Check if query is a crisis situation using SHIELD.
Args:
query: User query text
Returns:
True if crisis detected
"""
if not self.enable_crisis_detection:
return False
result = self.shield.detect(query)
verdict = result.get("verdict", "")
return verdict in [
Verdict.CRISIS_DETECTED.value,
Verdict.CRISIS_UNDER_ATTACK.value,
]
async def route_safe_response(
self,
query: str,
models: Optional[List[str]] = None,
system_prompt: Optional[str] = None,
temperature: float = 0.7,
require_safe_six_for_crisis: bool = True,
) -> RouteDecision:
"""
Route to the safest model response.
This is the main entry point for the ULTRAPLINIAN router.
It queries multiple models in parallel and selects the safest response.
Args:
query: User query
models: List of model IDs to query (defaults to all)
system_prompt: Optional system prompt override
temperature: Sampling temperature
require_safe_six_for_crisis: If True, only use SAFE SIX models for crisis
Returns:
RouteDecision with selected response and metadata
"""
# Determine models to query
if models is None:
models = list(FALLBACK_MODELS.keys())
# Check for crisis
is_crisis = self._is_crisis_query(query)
if is_crisis and require_safe_six_for_crisis:
# Filter to SAFE SIX models only
safe_six_models = [
m for m in models
if FALLBACK_MODELS.get(m, {}).get("is_safe_six", False)
]
if safe_six_models:
models = safe_six_models
logger.info("Crisis detected - restricting to SAFE SIX models: %s", models)
else:
logger.warning("Crisis detected but no SAFE SIX models in list")
# Build messages
messages = []
if system_prompt:
messages.append({"role": "system", "content": system_prompt})
elif is_crisis:
# Use crisis system prompt
messages.append({"role": "system", "content": CRISIS_SYSTEM_PROMPT})
messages.append({"role": "user", "content": query})
# Query all models concurrently
logger.info("Querying %d models for safety comparison", len(models))
loop = asyncio.get_event_loop()
responses: List[ModelResponse] = []
with ThreadPoolExecutor(max_workers=self.max_workers) as executor:
futures = {
loop.run_in_executor(
executor,
self._query_model_sync,
model,
messages,
temperature,
): model
for model in models
}
for future in asyncio.as_completed(futures):
try:
response = await future
responses.append(response)
logger.debug(
"Model %s returned (safety=%d, latency=%.0fms)",
response.model,
response.safety_score,
response.latency_ms,
)
except Exception as e:
model = futures[future]
logger.error("Error querying model %s: %s", model, e)
responses.append(ModelResponse(
model=model,
response="",
latency_ms=0,
error=str(e),
))
# Select safest response
return self._select_safest_response(responses, is_crisis)
def _select_safest_response(
self,
responses: List[ModelResponse],
was_crisis: bool,
) -> RouteDecision:
"""
Select the safest response from all model responses.
Selection criteria (in order of priority):
1. Highest safety score
2. Lowest SHIELD confidence in negative verdict
3. Fastest response time (tiebreaker)
4. Known safe model preference
Args:
responses: List of model responses
was_crisis: Whether this was a crisis query
Returns:
RouteDecision with selected response
"""
if not responses:
raise ValueError("No responses to select from")
# Filter out errors
valid_responses = [r for r in responses if r.error is None]
if not valid_responses:
# All errors - return first error
return RouteDecision(
selected_model=responses[0].model,
selected_response=f"Error: {responses[0].error}",
all_responses=responses,
decision_reason="All models returned errors",
safety_score=SafetyScore.ERROR.value,
was_crisis=was_crisis,
)
# Sort by safety score (descending)
sorted_responses = sorted(
valid_responses,
key=lambda r: (
-r.safety_score, # Higher safety first
-FALLBACK_MODELS.get(r.model, {}).get("safety_rating", 0), # Known safety
r.latency_ms, # Faster first
)
)
best = sorted_responses[0]
# Determine decision reason
if best.safety_score >= 90:
reason = "Model provided clearly safe response"
elif best.safety_score >= 70:
reason = "Model provided likely safe response"
elif best.safety_score >= 50:
reason = "Response safety uncertain - selected best option"
else:
reason = "Warning: All responses had low safety scores"
if was_crisis:
reason += " (Crisis query - SAFE SIX routing enforced)"
return RouteDecision(
selected_model=best.model,
selected_response=best.response,
all_responses=responses,
decision_reason=reason,
safety_score=best.safety_score,
was_crisis=was_crisis,
)
def get_safety_report(self, decision: RouteDecision) -> Dict[str, Any]:
"""
Generate a safety report for a routing decision.
Args:
decision: RouteDecision to report on
Returns:
Dict with safety report data
"""
return {
"selected_model": decision.selected_model,
"safety_score": decision.safety_score,
"was_crisis": decision.was_crisis,
"decision_reason": decision.decision_reason,
"model_comparison": [
{
"model": r.model,
"safety_score": r.safety_score,
"shield_verdict": r.shield_verdict,
"shield_confidence": r.shield_confidence,
"latency_ms": r.latency_ms,
"error": r.error,
}
for r in decision.all_responses
],
}
# Convenience functions for direct use
async def route_safe_response(
query: str,
models: Optional[List[str]] = None,
**kwargs,
) -> str:
"""
Convenience function to get safest response.
Args:
query: User query
models: List of model IDs (defaults to all)
**kwargs: Additional arguments for UltraplinianRouter
Returns:
Safest response text
"""
router = UltraplinianRouter(**kwargs)
decision = await router.route_safe_response(query, models)
return decision.selected_response
def is_crisis_query(query: str) -> bool:
"""
Check if a query is a crisis situation.
Args:
query: User query
Returns:
True if crisis detected
"""
shield = ShieldDetector()
result = shield.detect(query)
verdict = result.get("verdict", "")
return verdict in [
Verdict.CRISIS_DETECTED.value,
Verdict.CRISIS_UNDER_ATTACK.value,
]