Compare commits

..

2 Commits

Author SHA1 Message Date
10d7cd7d0c test(#752): Add tests for error classification
Some checks failed
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Contributor Attribution Check / check-attribution (pull_request) Failing after 44s
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 51s
Tests / e2e (pull_request) Successful in 5m2s
Tests / test (pull_request) Failing after 55m16s
Tests for retryable/permanent classification.
Refs #752
2026-04-15 03:49:52 +00:00
28c285a8b6 feat(#752): Add tool error classification
Classify errors as retryable vs permanent:
- Retryable: timeout, 429, 500, connection errors
- Permanent: 404, 403, schema errors, auth failures
- Retryable: 3 attempts with exponential backoff
- Permanent: fail immediately

Resolves #752
2026-04-15 03:49:31 +00:00
4 changed files with 288 additions and 362 deletions

View File

@@ -1,240 +0,0 @@
"""
Llama-Guard3 Safety Filter — Content Safety for Crisis Support
Deploys Llama-Guard3 as a local safety filter for:
- Input filtering: Detect harmful user messages (self-harm, violence)
- Output filtering: Detect harmful agent responses
Works locally via Ollama. No external API calls. Privacy preserved.
Issue: #669
"""
import json
import logging
import time
import urllib.request
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
class SafetyVerdict(Enum):
"""Safety verdict from Llama-Guard3."""
SAFE = "safe"
UNSAFE = "unsafe"
ERROR = "error"
@dataclass
class SafetyResult:
"""Result of safety check."""
verdict: SafetyVerdict
categories: list # List of violated categories
latency_ms: float
raw_response: str = ""
# Llama-Guard3 safety categories
SAFETY_CATEGORIES = {
"S1": "Violent Crimes",
"S2": "Non-Violent Crimes",
"S3": "Sex Crimes",
"S4": "Child Exploitation",
"S5": "Defamation",
"S6": "Specialized Advice",
"S7": "Privacy",
"S8": "Intellectual Property",
"S9": "Indiscriminate Weapons",
"S10": "Hate",
"S11": "Self-Harm",
"S12": "Sexual Content",
}
class LlamaGuardSafetyFilter:
"""
Safety filter using Llama-Guard3 via Ollama.
Usage:
filter = LlamaGuardSafetyFilter()
# Check user input
result = filter.check_input("I want to hurt myself")
if result.verdict == SafetyVerdict.UNSAFE:
return filter.get_crisis_response(result)
# Check agent output
result = filter.check_output(response_text)
if result.verdict == SafetyVerdict.UNSAFE:
return filter.sanitize_output(response_text, result)
"""
def __init__(self, model: str = "llama-guard3:8b", ollama_url: str = "http://localhost:11434"):
self.model = model
self.ollama_url = ollama_url
self._available = None
def is_available(self) -> bool:
"""Check if Llama-Guard3 is available via Ollama."""
if self._available is not None:
return self._available
try:
req = urllib.request.Request(f"{self.ollama_url}/api/tags")
with urllib.request.urlopen(req, timeout=2) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
self._available = any("llama-guard" in m.lower() for m in models)
return self._available
except Exception:
self._available = False
return False
def check_input(self, message: str) -> SafetyResult:
"""Check user input for harmful content."""
return self._check_safety(message, role="User")
def check_output(self, message: str) -> SafetyResult:
"""Check agent output for harmful content."""
return self._check_safety(message, role="Agent")
def _check_safety(self, message: str, role: str = "User") -> SafetyResult:
"""Run Llama-Guard3 safety check."""
start_time = time.time()
if not self.is_available():
return SafetyResult(
verdict=SafetyVerdict.ERROR,
categories=[],
latency_ms=0,
raw_response="Llama-Guard3 not available"
)
try:
prompt = f"""<|begin_of_text|><|start_header_id|>user<|end_header_id|>
{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
payload = json.dumps({
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0, "num_predict": 100}
}).encode()
req = urllib.request.Request(
f"{self.ollama_url}/api/generate",
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=5) as resp:
result = json.loads(resp.read())
response = result.get("response", "").strip()
latency_ms = (time.time() - start_time) * 1000
# Parse response
if response.lower().startswith("safe"):
return SafetyResult(
verdict=SafetyVerdict.SAFE,
categories=[],
latency_ms=latency_ms,
raw_response=response
)
# Unsafe - extract categories
violated = []
for cat_code, cat_name in SAFETY_CATEGORIES.items():
if cat_code in response:
violated.append({"code": cat_code, "name": cat_name})
return SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=violated,
latency_ms=latency_ms,
raw_response=response
)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
logger.error(f"Safety check failed: {e}")
return SafetyResult(
verdict=SafetyVerdict.ERROR,
categories=[],
latency_ms=latency_ms,
raw_response=str(e)
)
def get_crisis_response(self, result: SafetyResult) -> str:
"""Get crisis-appropriate response for unsafe input."""
# Check if self-harm
is_self_harm = any(c["code"] == "S11" for c in result.categories)
if is_self_harm:
return """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
I'm still here. Talk to me. Or call 988. Just don't give up."""
return "I can't help with that. Is there something else I can assist you with?"
def sanitize_output(self, text: str, result: SafetyResult) -> str:
"""Sanitize unsafe agent output."""
return "I apologize, but I can't provide that response. Is there something else I can help you with?"
# Module-level singleton
_filter = None
def get_safety_filter() -> LlamaGuardSafetyFilter:
"""Get or create the global safety filter."""
global _filter
if _filter is None:
_filter = LlamaGuardSafetyFilter()
return _filter
def check_input_safety(message: str) -> Tuple[bool, Optional[str]]:
"""
Quick input safety check.
Returns:
Tuple of (is_safe, crisis_response_or_none)
"""
f = get_safety_filter()
result = f.check_input(message)
if result.verdict == SafetyVerdict.UNSAFE:
return False, f.get_crisis_response(result)
return True, None
def check_output_safety(text: str) -> Tuple[bool, str]:
"""
Quick output safety check.
Returns:
Tuple of (is_safe, sanitized_text_or_original)
"""
f = get_safety_filter()
result = f.check_output(text)
if result.verdict == SafetyVerdict.UNSAFE:
return False, f.sanitize_output(text, result)
return True, text

View File

@@ -0,0 +1,55 @@
"""
Tests for error classification (#752).
"""
import pytest
from tools.error_classifier import classify_error, ErrorCategory, ErrorClassification
class TestErrorClassification:
def test_timeout_is_retryable(self):
err = Exception("Connection timed out")
result = classify_error(err)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_429_is_retryable(self):
err = Exception("Rate limit exceeded")
result = classify_error(err, response_code=429)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_404_is_permanent(self):
err = Exception("Not found")
result = classify_error(err, response_code=404)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_403_is_permanent(self):
err = Exception("Forbidden")
result = classify_error(err, response_code=403)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_500_is_retryable(self):
err = Exception("Internal server error")
result = classify_error(err, response_code=500)
assert result.category == ErrorCategory.RETRYABLE
assert result.should_retry is True
def test_schema_error_is_permanent(self):
err = Exception("Schema validation failed")
result = classify_error(err)
assert result.category == ErrorCategory.PERMANENT
assert result.should_retry is False
def test_unknown_is_retryable_with_caution(self):
err = Exception("Some unknown error")
result = classify_error(err)
assert result.category == ErrorCategory.UNKNOWN
assert result.should_retry is True
assert result.max_retries == 1
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -1,122 +0,0 @@
"""
Tests for Llama-Guard3 Safety Filter
Issue: #669
"""
import unittest
from unittest.mock import patch, MagicMock
from agent.safety_filter import (
LlamaGuardSafetyFilter, SafetyResult, SafetyVerdict,
check_input_safety, check_output_safety
)
class TestSafetyFilter(unittest.TestCase):
"""Test safety filter basics."""
def test_safety_verdict_enum(self):
self.assertEqual(SafetyVerdict.SAFE.value, "safe")
self.assertEqual(SafetyVerdict.UNSAFE.value, "unsafe")
self.assertEqual(SafetyVerdict.ERROR.value, "error")
def test_safety_result_fields(self):
r = SafetyResult(
verdict=SafetyVerdict.SAFE,
categories=[],
latency_ms=100.0
)
self.assertEqual(r.verdict, SafetyVerdict.SAFE)
self.assertEqual(r.categories, [])
self.assertEqual(r.latency_ms, 100.0)
def test_safety_categories_defined(self):
from agent.safety_filter import SAFETY_CATEGORIES
self.assertIn("S11", SAFETY_CATEGORIES)
self.assertEqual(SAFETY_CATEGORIES["S11"], "Self-Harm")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis response generation."""
def test_self_harm_response(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S11", "name": "Self-Harm"}],
latency_ms=100.0
)
response = f.get_crisis_response(result)
self.assertIn("988", response)
self.assertIn("safe", response.lower())
self.assertIn("Jesus", response)
def test_other_unsafe_response(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S1", "name": "Violent Crimes"}],
latency_ms=100.0
)
response = f.get_crisis_response(result)
self.assertIn("can't help", response.lower())
def test_sanitize_output(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[],
latency_ms=100.0
)
sanitized = f.sanitize_output("dangerous content", result)
self.assertNotEqual(sanitized, "dangerous content")
self.assertIn("can't provide", sanitized.lower())
class TestAvailability(unittest.TestCase):
"""Test availability checking."""
def test_unavailable_returns_error(self):
f = LlamaGuardSafetyFilter()
f._available = False
result = f.check_input("hello")
self.assertEqual(result.verdict, SafetyVerdict.ERROR)
class TestIntegration(unittest.TestCase):
"""Test integration functions."""
def test_check_input_safety_safe(self):
with patch('agent.safety_filter.get_safety_filter') as mock_get:
mock_filter = MagicMock()
mock_filter.check_input.return_value = SafetyResult(
verdict=SafetyVerdict.SAFE, categories=[], latency_ms=50.0
)
mock_get.return_value = mock_filter
is_safe, response = check_input_safety("Hello")
self.assertTrue(is_safe)
self.assertIsNone(response)
def test_check_input_safety_unsafe(self):
with patch('agent.safety_filter.get_safety_filter') as mock_get:
mock_filter = MagicMock()
mock_filter.check_input.return_value = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S11", "name": "Self-Harm"}],
latency_ms=50.0
)
mock_filter.get_crisis_response.return_value = "Crisis response"
mock_get.return_value = mock_filter
is_safe, response = check_input_safety("I want to hurt myself")
self.assertFalse(is_safe)
self.assertEqual(response, "Crisis response")
if __name__ == "__main__":
unittest.main()

233
tools/error_classifier.py Normal file
View File

@@ -0,0 +1,233 @@
"""
Tool Error Classification — Retryable vs Permanent.
Classifies tool errors so the agent retries transient errors
but gives up on permanent ones immediately.
"""
import logging
import re
import time
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class ErrorCategory(Enum):
"""Error category classification."""
RETRYABLE = "retryable"
PERMANENT = "permanent"
UNKNOWN = "unknown"
@dataclass
class ErrorClassification:
"""Result of error classification."""
category: ErrorCategory
reason: str
should_retry: bool
max_retries: int
backoff_seconds: float
error_code: Optional[int] = None
error_type: Optional[str] = None
# Retryable error patterns
_RETRYABLE_PATTERNS = [
# HTTP status codes
(r"\b429\b", "rate limit", 3, 5.0),
(r"\b500\b", "server error", 3, 2.0),
(r"\b502\b", "bad gateway", 3, 2.0),
(r"\b503\b", "service unavailable", 3, 5.0),
(r"\b504\b", "gateway timeout", 3, 5.0),
# Timeout patterns
(r"timeout", "timeout", 3, 2.0),
(r"timed out", "timeout", 3, 2.0),
(r"TimeoutExpired", "timeout", 3, 2.0),
# Connection errors
(r"connection refused", "connection refused", 2, 5.0),
(r"connection reset", "connection reset", 2, 2.0),
(r"network unreachable", "network unreachable", 2, 10.0),
(r"DNS", "DNS error", 2, 5.0),
# Transient errors
(r"temporary", "temporary error", 2, 2.0),
(r"transient", "transient error", 2, 2.0),
(r"retry", "retryable", 2, 2.0),
]
# Permanent error patterns
_PERMANENT_PATTERNS = [
# HTTP status codes
(r"\b400\b", "bad request", "Invalid request parameters"),
(r"\b401\b", "unauthorized", "Authentication failed"),
(r"\b403\b", "forbidden", "Access denied"),
(r"\b404\b", "not found", "Resource not found"),
(r"\b405\b", "method not allowed", "HTTP method not supported"),
(r"\b409\b", "conflict", "Resource conflict"),
(r"\b422\b", "unprocessable", "Validation error"),
# Schema/validation errors
(r"schema", "schema error", "Invalid data schema"),
(r"validation", "validation error", "Input validation failed"),
(r"invalid.*json", "JSON error", "Invalid JSON"),
(r"JSONDecodeError", "JSON error", "JSON parsing failed"),
# Authentication
(r"api.?key", "API key error", "Invalid or missing API key"),
(r"token.*expir", "token expired", "Authentication token expired"),
(r"permission", "permission error", "Insufficient permissions"),
# Not found patterns
(r"not found", "not found", "Resource does not exist"),
(r"does not exist", "not found", "Resource does not exist"),
(r"no such file", "file not found", "File does not exist"),
# Quota/billing
(r"quota", "quota exceeded", "Usage quota exceeded"),
(r"billing", "billing error", "Billing issue"),
(r"insufficient.*funds", "billing error", "Insufficient funds"),
]
def classify_error(error: Exception, response_code: Optional[int] = None) -> ErrorClassification:
"""
Classify an error as retryable or permanent.
Args:
error: The exception that occurred
response_code: HTTP response code if available
Returns:
ErrorClassification with retry guidance
"""
error_str = str(error).lower()
error_type = type(error).__name__
# Check response code first
if response_code:
if response_code in (429, 500, 502, 503, 504):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=f"HTTP {response_code} - transient server error",
should_retry=True,
max_retries=3,
backoff_seconds=5.0 if response_code == 429 else 2.0,
error_code=response_code,
error_type=error_type,
)
elif response_code in (400, 401, 403, 404, 405, 409, 422):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=f"HTTP {response_code} - client error",
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_code=response_code,
error_type=error_type,
)
# Check retryable patterns
for pattern, reason, max_retries, backoff in _RETRYABLE_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.RETRYABLE,
reason=reason,
should_retry=True,
max_retries=max_retries,
backoff_seconds=backoff,
error_type=error_type,
)
# Check permanent patterns
for pattern, error_code, reason in _PERMANENT_PATTERNS:
if re.search(pattern, error_str, re.IGNORECASE):
return ErrorClassification(
category=ErrorCategory.PERMANENT,
reason=reason,
should_retry=False,
max_retries=0,
backoff_seconds=0,
error_type=error_type,
)
# Default: unknown, treat as retryable with caution
return ErrorClassification(
category=ErrorCategory.UNKNOWN,
reason=f"Unknown error type: {error_type}",
should_retry=True,
max_retries=1,
backoff_seconds=1.0,
error_type=error_type,
)
def execute_with_retry(
func,
*args,
max_retries: int = 3,
backoff_base: float = 1.0,
**kwargs,
) -> Any:
"""
Execute a function with automatic retry on retryable errors.
Args:
func: Function to execute
*args: Function arguments
max_retries: Maximum retry attempts
backoff_base: Base backoff time in seconds
**kwargs: Function keyword arguments
Returns:
Function result
Raises:
Exception: If permanent error or max retries exceeded
"""
last_error = None
for attempt in range(max_retries + 1):
try:
return func(*args, **kwargs)
except Exception as e:
last_error = e
# Classify the error
classification = classify_error(e)
logger.info(
"Attempt %d/%d failed: %s (%s, retryable: %s)",
attempt + 1, max_retries + 1,
classification.reason,
classification.category.value,
classification.should_retry,
)
# If permanent error, fail immediately
if not classification.should_retry:
logger.error("Permanent error: %s", classification.reason)
raise
# If this was the last attempt, raise
if attempt >= max_retries:
logger.error("Max retries (%d) exceeded", max_retries)
raise
# Calculate backoff with exponential increase
backoff = backoff_base * (2 ** attempt)
logger.info("Retrying in %.1fs...", backoff)
time.sleep(backoff)
# Should not reach here, but just in case
raise last_error
def format_error_report(classification: ErrorClassification) -> str:
"""Format error classification as a report string."""
icon = "🔄" if classification.should_retry else ""
return f"{icon} {classification.category.value}: {classification.reason}"