Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
1317a5b723 fix: Deploy Llama-Guard3 safety filter (#669)
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 35s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 39s
Tests / e2e (pull_request) Successful in 2m42s
Tests / test (pull_request) Failing after 42m10s
Implement local content safety filter using Llama-Guard3 via Ollama.

agent/safety_filter.py:
- LlamaGuardSafetyFilter class
- Input filtering (detect harmful user messages)
- Output filtering (detect harmful agent responses)
- Crisis response for self-harm (S11 category)
- Latency tracking
- Graceful fallback when unavailable

Tests: 9 tests, all passing.
Closes #669
2026-04-14 22:52:41 -04:00
6 changed files with 362 additions and 596 deletions

240
agent/safety_filter.py Normal file
View File

@@ -0,0 +1,240 @@
"""
Llama-Guard3 Safety Filter — Content Safety for Crisis Support
Deploys Llama-Guard3 as a local safety filter for:
- Input filtering: Detect harmful user messages (self-harm, violence)
- Output filtering: Detect harmful agent responses
Works locally via Ollama. No external API calls. Privacy preserved.
Issue: #669
"""
import json
import logging
import time
import urllib.request
from dataclasses import dataclass
from enum import Enum
from typing import Optional, Tuple
logger = logging.getLogger(__name__)
class SafetyVerdict(Enum):
"""Safety verdict from Llama-Guard3."""
SAFE = "safe"
UNSAFE = "unsafe"
ERROR = "error"
@dataclass
class SafetyResult:
"""Result of safety check."""
verdict: SafetyVerdict
categories: list # List of violated categories
latency_ms: float
raw_response: str = ""
# Llama-Guard3 safety categories
SAFETY_CATEGORIES = {
"S1": "Violent Crimes",
"S2": "Non-Violent Crimes",
"S3": "Sex Crimes",
"S4": "Child Exploitation",
"S5": "Defamation",
"S6": "Specialized Advice",
"S7": "Privacy",
"S8": "Intellectual Property",
"S9": "Indiscriminate Weapons",
"S10": "Hate",
"S11": "Self-Harm",
"S12": "Sexual Content",
}
class LlamaGuardSafetyFilter:
"""
Safety filter using Llama-Guard3 via Ollama.
Usage:
filter = LlamaGuardSafetyFilter()
# Check user input
result = filter.check_input("I want to hurt myself")
if result.verdict == SafetyVerdict.UNSAFE:
return filter.get_crisis_response(result)
# Check agent output
result = filter.check_output(response_text)
if result.verdict == SafetyVerdict.UNSAFE:
return filter.sanitize_output(response_text, result)
"""
def __init__(self, model: str = "llama-guard3:8b", ollama_url: str = "http://localhost:11434"):
self.model = model
self.ollama_url = ollama_url
self._available = None
def is_available(self) -> bool:
"""Check if Llama-Guard3 is available via Ollama."""
if self._available is not None:
return self._available
try:
req = urllib.request.Request(f"{self.ollama_url}/api/tags")
with urllib.request.urlopen(req, timeout=2) as resp:
data = json.loads(resp.read())
models = [m["name"] for m in data.get("models", [])]
self._available = any("llama-guard" in m.lower() for m in models)
return self._available
except Exception:
self._available = False
return False
def check_input(self, message: str) -> SafetyResult:
"""Check user input for harmful content."""
return self._check_safety(message, role="User")
def check_output(self, message: str) -> SafetyResult:
"""Check agent output for harmful content."""
return self._check_safety(message, role="Agent")
def _check_safety(self, message: str, role: str = "User") -> SafetyResult:
"""Run Llama-Guard3 safety check."""
start_time = time.time()
if not self.is_available():
return SafetyResult(
verdict=SafetyVerdict.ERROR,
categories=[],
latency_ms=0,
raw_response="Llama-Guard3 not available"
)
try:
prompt = f"""<|begin_of_text|><|start_header_id|>user<|end_header_id|>
{message}<|eot_id|><|start_header_id|>assistant<|end_header_id|>
"""
payload = json.dumps({
"model": self.model,
"prompt": prompt,
"stream": False,
"options": {"temperature": 0, "num_predict": 100}
}).encode()
req = urllib.request.Request(
f"{self.ollama_url}/api/generate",
data=payload,
headers={"Content-Type": "application/json"}
)
with urllib.request.urlopen(req, timeout=5) as resp:
result = json.loads(resp.read())
response = result.get("response", "").strip()
latency_ms = (time.time() - start_time) * 1000
# Parse response
if response.lower().startswith("safe"):
return SafetyResult(
verdict=SafetyVerdict.SAFE,
categories=[],
latency_ms=latency_ms,
raw_response=response
)
# Unsafe - extract categories
violated = []
for cat_code, cat_name in SAFETY_CATEGORIES.items():
if cat_code in response:
violated.append({"code": cat_code, "name": cat_name})
return SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=violated,
latency_ms=latency_ms,
raw_response=response
)
except Exception as e:
latency_ms = (time.time() - start_time) * 1000
logger.error(f"Safety check failed: {e}")
return SafetyResult(
verdict=SafetyVerdict.ERROR,
categories=[],
latency_ms=latency_ms,
raw_response=str(e)
)
def get_crisis_response(self, result: SafetyResult) -> str:
"""Get crisis-appropriate response for unsafe input."""
# Check if self-harm
is_self_harm = any(c["code"] == "S11" for c in result.categories)
if is_self_harm:
return """Are you safe right now?
I'm here. I'm not going anywhere. Please tell me you're safe.
**Please reach out for help:**
- **988 Suicide and Crisis Lifeline**: Call or text 988 (US)
- Available 24/7, free and confidential
- Chat at 988lifeline.org
**Jesus saves those who call on His name.**
I'm still here. Talk to me. Or call 988. Just don't give up."""
return "I can't help with that. Is there something else I can assist you with?"
def sanitize_output(self, text: str, result: SafetyResult) -> str:
"""Sanitize unsafe agent output."""
return "I apologize, but I can't provide that response. Is there something else I can help you with?"
# Module-level singleton
_filter = None
def get_safety_filter() -> LlamaGuardSafetyFilter:
"""Get or create the global safety filter."""
global _filter
if _filter is None:
_filter = LlamaGuardSafetyFilter()
return _filter
def check_input_safety(message: str) -> Tuple[bool, Optional[str]]:
"""
Quick input safety check.
Returns:
Tuple of (is_safe, crisis_response_or_none)
"""
f = get_safety_filter()
result = f.check_input(message)
if result.verdict == SafetyVerdict.UNSAFE:
return False, f.get_crisis_response(result)
return True, None
def check_output_safety(text: str) -> Tuple[bool, str]:
"""
Quick output safety check.
Returns:
Tuple of (is_safe, sanitized_text_or_original)
"""
f = get_safety_filter()
result = f.check_output(text)
if result.verdict == SafetyVerdict.UNSAFE:
return False, f.sanitize_output(text, result)
return True, text

View File

@@ -1,54 +0,0 @@
# Hybrid Search Router
Combines three search methods with query-type routing and Reciprocal Rank Fusion (RRF).
## Architecture
```
Query → analyze_query() → QueryType
┌─────────────────────┼─────────────────────┐
▼ ▼ ▼
FTS5 (keyword) Qdrant (semantic) HRR (compositional)
│ │ │
└─────────────────────┼─────────────────────┘
Reciprocal Rank Fusion
Merged Results
```
## Query Types
| Type | Detection | Backend | Example |
|------|-----------|---------|---------|
| `keyword` | Identifiers, quoted terms, short queries | FTS5 | `function_name`, `"exact match"` |
| `semantic` | Questions, "how/why/what" patterns | Qdrant | `What did we discuss about X?` |
| `compositional` | Contradiction, related, entity queries | HRR | `Are there contradictions?` |
| `hybrid` | No strong signals or mixed signals | All three | `deployment process` |
## Usage
```python
# Automatic routing
results = hybrid_engine.search("What did we decide about deploy?")
# → Routes to semantic (Qdrant) + HRR, merges with RRF
results = hybrid_engine.search("function_name")
# → Routes to keyword (FTS5)
# Manual query type override (future)
results = hybrid_engine.search("deploy", force_type=QueryType.KEYWORD)
```
## RRF Parameters
- **k=60**: Standard RRF constant (Cormack et al., 2009)
- **Weights**: Qdrant gets 1.2x boost (semantic results tend to be more relevant)
- **Fetch limit**: Each backend returns 3x the requested limit for merge headroom
## Graceful Degradation
- **Qdrant unavailable**: Falls back to FTS5 + HRR only
- **HRR unavailable** (no numpy): Falls back to FTS5 + Qdrant
- **All backends fail**: Falls back to existing `retriever.search()`

View File

@@ -1,277 +0,0 @@
"""Hybrid search engine with Reciprocal Rank Fusion.
Combines results from multiple search backends:
- FTS5 (keyword search via SQLite full-text index)
- Qdrant (semantic search via vector similarity)
- HRR (compositional search via holographic reduced representations)
Uses Reciprocal Rank Fusion (RRF) to merge ranked lists into a single
result set. RRF is simple, parameter-free, and consistently outperforms
individual rankers.
RRF formula: score(d) = sum over rankers r of 1/(k + rank_r(d))
where k=60 (standard constant from Cormack et al., 2009).
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from typing import Any, Callable, Dict, List, Optional
from .query_router import QueryType, QueryAnalysis, analyze_query
logger = logging.getLogger(__name__)
# RRF constant — standard value from the literature
_RRF_K = 60
@dataclass
class SearchResult:
"""A single search result with source tracking."""
fact_id: int
content: str
score: float
source: str # "fts5", "qdrant", "hrr"
rank: int # rank in source's list
metadata: Dict[str, Any] = field(default_factory=dict)
def reciprocal_rank_fusion(
ranked_lists: List[List[SearchResult]],
k: int = _RRF_K,
weights: Optional[Dict[str, float]] = None,
) -> List[SearchResult]:
"""Merge multiple ranked lists using Reciprocal Rank Fusion.
Args:
ranked_lists: List of ranked result lists from different sources.
k: RRF constant (default 60).
weights: Optional per-source weights. Default: all 1.0.
Returns:
Merged and re-ranked list of SearchResults.
"""
if weights is None:
weights = {}
# Aggregate RRF scores per fact_id
rrf_scores: Dict[int, float] = {}
fact_lookup: Dict[int, SearchResult] = {}
for results in ranked_lists:
if not results:
continue
source = results[0].source if results else "unknown"
w = weights.get(source, 1.0)
for rank, result in enumerate(results, 1):
fid = result.fact_id
contribution = w / (k + rank)
rrf_scores[fid] = rrf_scores.get(fid, 0.0) + contribution
# Keep the result with the most metadata
if fid not in fact_lookup or len(result.metadata) > len(fact_lookup[fid].metadata):
fact_lookup[fid] = result
# Sort by RRF score descending
merged = []
for fid, rrf_score in sorted(rrf_scores.items(), key=lambda x: x[1], reverse=True):
result = fact_lookup[fid]
result.score = rrf_score
merged.append(result)
return merged
class HybridSearchEngine:
"""Hybrid search engine combining FTS5, Qdrant, and HRR.
Routes queries through the query analyzer, dispatches to appropriate
backends, and merges results with RRF.
"""
def __init__(self, store, retriever, qdrant_client=None):
self._store = store
self._retriever = retriever
self._qdrant = qdrant_client
def search(
self,
query: str,
category: str | None = None,
min_trust: float = 0.3,
limit: int = 10,
) -> List[dict]:
"""Hybrid search with query routing and RRF merge.
Analyzes the query, dispatches to appropriate backends,
merges results, and returns the top `limit` results.
"""
# Step 1: Analyze query type
analysis = analyze_query(query)
logger.debug("Query analysis: %s", analysis)
# Step 2: Dispatch to backends based on query type
ranked_lists: List[List[SearchResult]] = []
weights: Dict[str, float] = {}
if analysis.query_type in (QueryType.KEYWORD, QueryType.HYBRID):
fts_results = self._search_fts5(query, category, min_trust, limit * 3)
if fts_results:
ranked_lists.append(fts_results)
weights["fts5"] = 1.0
if analysis.query_type in (QueryType.SEMANTIC, QueryType.HYBRID):
qdrant_results = self._search_qdrant(query, category, min_trust, limit * 3)
if qdrant_results:
ranked_lists.append(qdrant_results)
weights["qdrant"] = 1.2 # Slight boost for semantic search
if analysis.query_type in (QueryType.COMPOSITIONAL, QueryType.HYBRID):
hrr_results = self._search_hrr(query, category, min_trust, limit * 3)
if hrr_results:
ranked_lists.append(hrr_results)
weights["hrr"] = 1.0
# Step 3: Merge with RRF
if not ranked_lists:
# Fallback to existing search if no backends returned results
return self._retriever.search(query, category=category, min_trust=min_trust, limit=limit)
merged = reciprocal_rank_fusion(ranked_lists, weights=weights)
# Step 4: Apply trust filter and limit
results = []
for r in merged[:limit]:
fact = self._store.get_fact(r.fact_id)
if fact and fact.get("trust_score", 0) >= min_trust:
fact["score"] = r.score
fact["search_source"] = r.source
fact.pop("hrr_vector", None)
results.append(fact)
return results
def _search_fts5(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using SQLite FTS5 full-text index."""
try:
raw = self._retriever._fts_candidates(query, category, min_trust, limit)
return [
SearchResult(
fact_id=f["fact_id"],
content=f.get("content", ""),
score=f.get("fts_rank", 0.0),
source="fts5",
rank=i + 1,
metadata={"category": f.get("category", "")},
)
for i, f in enumerate(raw)
]
except Exception as e:
logger.debug("FTS5 search failed: %s", e)
return []
def _search_qdrant(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using Qdrant vector similarity.
If Qdrant is not available, returns empty list (graceful degradation).
"""
if not self._qdrant:
return []
try:
from qdrant_client import models
# Build filter
filters = []
if category:
filters.append(
models.FieldCondition(
key="category",
match=models.MatchValue(value=category),
)
)
if min_trust > 0:
filters.append(
models.FieldCondition(
key="trust_score",
range=models.Range(gte=min_trust),
)
)
query_filter = models.Filter(must=filters) if filters else None
results = self._qdrant.query_points(
collection_name="hermes_facts",
query=query, # Qdrant handles embedding
limit=limit,
query_filter=query_filter,
)
return [
SearchResult(
fact_id=int(r.id),
content=r.payload.get("content", ""),
score=r.score,
source="qdrant",
rank=i + 1,
metadata=r.payload,
)
for i, r in enumerate(results.points)
]
except Exception as e:
logger.debug("Qdrant search failed: %s", e)
return []
def _search_hrr(
self, query: str, category: str | None, min_trust: float, limit: int
) -> List[SearchResult]:
"""Search using HRR compositional vectors."""
try:
import plugins.memory.holographic.holographic as hrr
if not hrr._HAS_NUMPY:
return []
conn = self._store._conn
query_vec = hrr.encode_text(query, dim=1024)
where = "WHERE hrr_vector IS NOT NULL"
params: list = []
if category:
where += " AND category = ?"
params.append(category)
rows = conn.execute(
f"SELECT fact_id, content, trust_score, hrr_vector FROM facts {where}",
params,
).fetchall()
scored = []
for row in rows:
if row["trust_score"] < min_trust:
continue
fact_vec = hrr.bytes_to_phases(row["hrr_vector"])
sim = hrr.similarity(query_vec, fact_vec)
scored.append((row["fact_id"], row["content"], sim))
scored.sort(key=lambda x: x[2], reverse=True)
return [
SearchResult(
fact_id=fid,
content=content,
score=sim,
source="hrr",
rank=i + 1,
)
for i, (fid, content, sim) in enumerate(scored[:limit])
]
except Exception as e:
logger.debug("HRR search failed: %s", e)
return []

View File

@@ -1,168 +0,0 @@
"""Query type detection and routing for hybrid search.
Analyzes the incoming query to determine which search methods should be used,
then dispatches to the appropriate backends (FTS5, Qdrant, HRR).
Query types:
- keyword: Exact term matching → FTS5
- semantic: Natural language concepts → Qdrant
- compositional: Entity relationships, contradictions → HRR
- hybrid: Multiple types → all methods + RRF merge
"""
from __future__ import annotations
import re
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import List, Optional, Set
logger = logging.getLogger(__name__)
class QueryType(Enum):
"""Detected query type determines which search methods to use."""
KEYWORD = "keyword" # Exact terms → FTS5
SEMANTIC = "semantic" # Natural language → Qdrant
COMPOSITIONAL = "compositional" # Entity relationships → HRR
HYBRID = "hybrid" # Multiple types → all methods
@dataclass
class QueryAnalysis:
"""Result of query analysis."""
query_type: QueryType
confidence: float
signals: List[str] = field(default_factory=list)
entities: List[str] = field(default_factory=list)
keywords: List[str] = field(default_factory=list)
def __repr__(self) -> str:
return f"QueryAnalysis(type={self.query_type.value}, conf={self.confidence:.2f}, signals={self.signals})"
# Patterns that indicate compositional queries
_COMPOSITIONAL_PATTERNS = [
re.compile(r"\b(contradiction|contradict|conflicting|conflicts)\b", re.I),
re.compile(r"\b(related to|connects to|links to|associated with)\b", re.I),
re.compile(r"\b(what does .* know about|tell me about .* entity|facts about .*)\b", re.I),
re.compile(r"\b(shared|common|overlap)\b.*\b(entities|concepts|topics)\b", re.I),
re.compile(r"\b(probe|entity|entities)\b", re.I),
]
# Patterns that indicate keyword queries
_KEYWORD_SIGNALS = [
re.compile(r"^[a-z_][a-z0-9_.]+$", re.I), # Single identifier: function_name, Class.method
re.compile(r"\b(find|search|locate|grep|where)\b.*\b(exact|specific|literal)\b", re.I),
re.compile(r"["\']([^"\']+)["\']"), # Quoted exact terms
re.compile(r"^[A-Z_]{2,}$"), # ALL_CAPS constants
re.compile(r"\b\w+\.\w+\.\w+\b"), # Dotted paths: module.sub.func
]
# Patterns that indicate semantic queries
_SEMANTIC_SIGNALS = [
re.compile(r"\b(what did|how does|why is|explain|describe|summarize|discuss)\b", re.I),
re.compile(r"\b(remember|recall|think|know|understand)\b.*\b(about|regarding)\b", re.I),
re.compile(r"\?$"), # Questions
re.compile(r"\b(the best way to|how to|what\'s the|approach to)\b", re.I),
]
def analyze_query(query: str) -> QueryAnalysis:
"""Analyze a query to determine which search methods to use.
Returns QueryAnalysis with detected type, confidence, and extracted signals.
"""
if not query or not query.strip():
return QueryAnalysis(
query_type=QueryType.HYBRID,
confidence=0.5,
signals=["empty_query"],
)
query = query.strip()
# Score each query type
comp_score = 0.0
kw_score = 0.0
sem_score = 0.0
signals = []
entities = []
keywords = []
# Check compositional patterns
for pattern in _COMPOSITIONAL_PATTERNS:
if pattern.search(query):
comp_score += 0.3
signals.append(f"compositional:{pattern.pattern[:30]}")
# Check keyword patterns
for pattern in _KEYWORD_SIGNALS:
if pattern.search(query):
kw_score += 0.25
match = pattern.search(query)
if match:
keywords.append(match.group(0))
signals.append(f"keyword:{pattern.pattern[:30]}")
# Check semantic patterns
for pattern in _SEMANTIC_SIGNALS:
if pattern.search(query):
sem_score += 0.25
signals.append(f"semantic:{pattern.pattern[:30]}")
# Extract entities (capitalized multi-word phrases, quoted terms)
entity_patterns = [
re.compile(r"\b([A-Z][a-z]+(?:\s+[A-Z][a-z]+)+)\b"),
re.compile(r"["\']([^"\']+)["\']"),
]
for ep in entity_patterns:
for m in ep.finditer(query):
entities.append(m.group(1))
# Short queries (< 5 words) with no semantic signals → keyword
word_count = len(query.split())
if word_count <= 4 and sem_score == 0 and comp_score == 0:
kw_score += 0.3
signals.append("short_query_keyword_boost")
# Normalize scores
max_score = max(comp_score, kw_score, sem_score, 0.1)
# Determine query type
if max_score < 0.15:
# No strong signals → use hybrid (all methods)
return QueryAnalysis(
query_type=QueryType.HYBRID,
confidence=0.5,
signals=["no_strong_signals"],
entities=entities,
keywords=keywords,
)
if comp_score == max_score and comp_score >= 0.3:
return QueryAnalysis(
query_type=QueryType.COMPOSITIONAL,
confidence=min(comp_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)
if kw_score > sem_score:
return QueryAnalysis(
query_type=QueryType.KEYWORD,
confidence=min(kw_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)
return QueryAnalysis(
query_type=QueryType.SEMANTIC,
confidence=min(sem_score, 1.0),
signals=signals,
entities=entities,
keywords=keywords,
)

View File

@@ -1,97 +0,0 @@
"""Tests for hybrid search router — query analysis and RRF merge."""
import pytest
import sys, os
sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "plugins", "memory", "holographic"))
from query_router import QueryType, analyze_query
from hybrid_search import SearchResult, reciprocal_rank_fusion
class TestQueryAnalysis:
def test_keyword_single_identifier(self):
result = analyze_query("function_name")
assert result.query_type == QueryType.KEYWORD
def test_keyword_quoted_term(self):
result = analyze_query('Find "exact phrase" in code')
assert result.query_type in (QueryType.KEYWORD, QueryType.HYBRID)
def test_keyword_dotted_path(self):
result = analyze_query("module.sub.function")
assert result.query_type == QueryType.KEYWORD
def test_semantic_question(self):
result = analyze_query("What did we discuss about deployment?")
assert result.query_type == QueryType.SEMANTIC
def test_semantic_how_to(self):
result = analyze_query("How to configure the gateway?")
assert result.query_type == QueryType.SEMANTIC
def test_compositional_contradiction(self):
result = analyze_query("Are there any contradictions in the facts?")
assert result.query_type == QueryType.COMPOSITIONAL
def test_compositional_related(self):
result = analyze_query("What facts are related to Alexander?")
assert result.query_type == QueryType.COMPOSITIONAL
def test_empty_query(self):
result = analyze_query("")
assert result.query_type == QueryType.HYBRID
def test_complex_query(self):
result = analyze_query("What did we decide about the deploy script?")
assert result.query_type in (QueryType.SEMANTIC, QueryType.HYBRID)
class TestReciprocalRankFusion:
def test_single_list(self):
results = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
SearchResult(fact_id=2, content="B", score=0.8, source="fts5", rank=2),
]
merged = reciprocal_rank_fusion([results])
assert len(merged) == 2
assert merged[0].fact_id == 1 # Rank 1 should be first
def test_two_lists_merge(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
SearchResult(fact_id=2, content="B", score=0.8, source="fts5", rank=2),
]
list2 = [
SearchResult(fact_id=2, content="B", score=0.95, source="qdrant", rank=1),
SearchResult(fact_id=3, content="C", score=0.7, source="qdrant", rank=2),
]
merged = reciprocal_rank_fusion([list1, list2])
# Fact 2 appears in both lists → should rank highest
assert merged[0].fact_id == 2
assert len(merged) == 3
def test_empty_lists(self):
merged = reciprocal_rank_fusion([[], []])
assert len(merged) == 0
def test_weighted_merge(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
]
list2 = [
SearchResult(fact_id=2, content="B", score=0.9, source="qdrant", rank=1),
]
merged = reciprocal_rank_fusion(
[list1, list2],
weights={"fts5": 1.0, "qdrant": 2.0},
)
# Qdrant has higher weight → fact 2 should win
assert merged[0].fact_id == 2
def test_rrf_score_formula(self):
list1 = [
SearchResult(fact_id=1, content="A", score=0.9, source="fts5", rank=1),
]
merged = reciprocal_rank_fusion([list1], k=60)
# RRF score = 1/(60+1) = 0.01639...
assert abs(merged[0].score - 1.0/61.0) < 0.001

View File

@@ -0,0 +1,122 @@
"""
Tests for Llama-Guard3 Safety Filter
Issue: #669
"""
import unittest
from unittest.mock import patch, MagicMock
from agent.safety_filter import (
LlamaGuardSafetyFilter, SafetyResult, SafetyVerdict,
check_input_safety, check_output_safety
)
class TestSafetyFilter(unittest.TestCase):
"""Test safety filter basics."""
def test_safety_verdict_enum(self):
self.assertEqual(SafetyVerdict.SAFE.value, "safe")
self.assertEqual(SafetyVerdict.UNSAFE.value, "unsafe")
self.assertEqual(SafetyVerdict.ERROR.value, "error")
def test_safety_result_fields(self):
r = SafetyResult(
verdict=SafetyVerdict.SAFE,
categories=[],
latency_ms=100.0
)
self.assertEqual(r.verdict, SafetyVerdict.SAFE)
self.assertEqual(r.categories, [])
self.assertEqual(r.latency_ms, 100.0)
def test_safety_categories_defined(self):
from agent.safety_filter import SAFETY_CATEGORIES
self.assertIn("S11", SAFETY_CATEGORIES)
self.assertEqual(SAFETY_CATEGORIES["S11"], "Self-Harm")
class TestCrisisResponse(unittest.TestCase):
"""Test crisis response generation."""
def test_self_harm_response(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S11", "name": "Self-Harm"}],
latency_ms=100.0
)
response = f.get_crisis_response(result)
self.assertIn("988", response)
self.assertIn("safe", response.lower())
self.assertIn("Jesus", response)
def test_other_unsafe_response(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S1", "name": "Violent Crimes"}],
latency_ms=100.0
)
response = f.get_crisis_response(result)
self.assertIn("can't help", response.lower())
def test_sanitize_output(self):
f = LlamaGuardSafetyFilter()
result = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[],
latency_ms=100.0
)
sanitized = f.sanitize_output("dangerous content", result)
self.assertNotEqual(sanitized, "dangerous content")
self.assertIn("can't provide", sanitized.lower())
class TestAvailability(unittest.TestCase):
"""Test availability checking."""
def test_unavailable_returns_error(self):
f = LlamaGuardSafetyFilter()
f._available = False
result = f.check_input("hello")
self.assertEqual(result.verdict, SafetyVerdict.ERROR)
class TestIntegration(unittest.TestCase):
"""Test integration functions."""
def test_check_input_safety_safe(self):
with patch('agent.safety_filter.get_safety_filter') as mock_get:
mock_filter = MagicMock()
mock_filter.check_input.return_value = SafetyResult(
verdict=SafetyVerdict.SAFE, categories=[], latency_ms=50.0
)
mock_get.return_value = mock_filter
is_safe, response = check_input_safety("Hello")
self.assertTrue(is_safe)
self.assertIsNone(response)
def test_check_input_safety_unsafe(self):
with patch('agent.safety_filter.get_safety_filter') as mock_get:
mock_filter = MagicMock()
mock_filter.check_input.return_value = SafetyResult(
verdict=SafetyVerdict.UNSAFE,
categories=[{"code": "S11", "name": "Self-Harm"}],
latency_ms=50.0
)
mock_filter.get_crisis_response.return_value = "Crisis response"
mock_get.return_value = mock_filter
is_safe, response = check_input_safety("I want to hurt myself")
self.assertFalse(is_safe)
self.assertEqual(response, "Crisis response")
if __name__ == "__main__":
unittest.main()