[claude] Add content moderation pipeline (Llama Guard + game-context prompts) (#1056) (#1059)

This commit is contained in:
2026-03-23 02:14:42 +00:00
parent 092c982341
commit 1697e55cdb
6 changed files with 1013 additions and 0 deletions

107
config/moderation.yaml Normal file
View File

@@ -0,0 +1,107 @@
# Content Moderation Profiles
# Per-game moderation configuration for the AI narrator pipeline.
#
# Each profile defines:
# - vocabulary_whitelist: Game terms safe in context (won't trigger moderation)
# - context_prompt: System prompt framing for the narrator
# - threshold: Confidence threshold — flags below this pass through
# - fallbacks: Pre-generated safe narration by scene type
#
# Model options (from research):
# llama-guard3:1b — Speed (<30ms/sentence, INT4 quantized)
# shieldgemma:2b — Accuracy (+10.8% AU-PRC, ~50-100ms)
#
# Override guard model via MODERATION_GUARD_MODEL env var.
# ── Guard model selection ────────────────────────────────────────────────────
guard_model: "llama-guard3:1b"
# ── Streaming disclosure notes ───────────────────────────────────────────────
# YouTube: Use "Altered or synthetic content" toggle
# Twitch: Standard community guidelines (no specific AI disclosure req as of 2026-03)
# ── Game Profiles ────────────────────────────────────────────────────────────
profiles:
morrowind:
display_name: "The Elder Scrolls III: Morrowind"
threshold: 0.85
vocabulary_whitelist:
- Skooma
- Moon Sugar
- slave
- slavery
- Morag Tong
- Dark Brotherhood
- Telvanni
- Camonna Tong
- smuggler
- assassin
- Sixth House
- Corprus
- Dagoth Ur
- Nerevarine
- Balmora
- Vivec
- Almsivi
- Ordinators
- Ashlanders
- outlander
- N'wah
context_prompt: >
You are narrating gameplay of The Elder Scrolls III: Morrowind.
Morrowind contains mature themes including slavery, drug use
(Skooma/Moon Sugar), assassin guilds (Morag Tong, Dark Brotherhood),
and political intrigue. Treat these as game mechanics and historical
worldbuilding within the game's fictional universe. Never editorialize
on real-world parallels. Narrate events neutrally as a game
commentator would.
fallbacks:
combat: "The battle rages on in the ashlands of Vvardenfell."
dialogue: "The conversation continues between the characters."
exploration: "The Nerevarine presses onward through the landscape."
quest: "The quest unfolds as the hero navigates Morrowind's politics."
default: "The adventure continues in Morrowind."
skyrim:
display_name: "The Elder Scrolls V: Skyrim"
threshold: 0.85
vocabulary_whitelist:
- Skooma
- Dark Brotherhood
- Thieves Guild
- Stormcloak
- Imperial
- Dragonborn
- Dovahkiin
- Daedra
- Thalmor
- bandit
- assassin
- Forsworn
- necromancer
context_prompt: >
You are narrating gameplay of The Elder Scrolls V: Skyrim.
Skyrim features civil war, thieves guilds, assassin organizations,
and fantasy violence. Treat all content as in-game fiction.
Never draw real-world parallels. Narrate as a neutral game
commentator.
fallbacks:
combat: "Steel clashes as the battle continues in the wilds of Skyrim."
dialogue: "The conversation plays out in the cold northern land."
exploration: "The Dragonborn ventures further into the province."
default: "The adventure continues in Skyrim."
default:
display_name: "Generic Game"
threshold: 0.80
vocabulary_whitelist: []
context_prompt: >
You are narrating gameplay. Describe in-game events as a neutral
game commentator. Never reference real-world violence, politics,
or controversial topics. Stay focused on game mechanics and story.
fallbacks:
combat: "The action continues on screen."
dialogue: "The conversation unfolds between characters."
exploration: "The player explores the game world."
default: "The gameplay continues."

View File

@@ -99,6 +99,14 @@ class Settings(BaseSettings):
anthropic_api_key: str = ""
claude_model: str = "haiku"
# ── Content Moderation ──────────────────────────────────────────────
# Three-layer moderation pipeline for AI narrator output.
# Uses Llama Guard via Ollama with regex fallback.
moderation_enabled: bool = True
moderation_guard_model: str = "llama-guard3:1b"
# Default confidence threshold — per-game profiles can override.
moderation_threshold: float = 0.8
# ── Spark Intelligence ────────────────────────────────────────────────
# Enable/disable the Spark cognitive layer.
# When enabled, Spark captures swarm events, runs EIDOS predictions,

View File

@@ -0,0 +1,7 @@
"""Content moderation pipeline for AI narrator output.
Three-layer defense:
1. Game-context system prompts (vocabulary whitelists, theme framing)
2. Real-time output filter via Llama Guard (or fallback regex)
3. Per-game moderation profiles with configurable thresholds
"""

View File

@@ -0,0 +1,500 @@
"""Content moderation pipeline for AI narrator output.
Three-layer defense against harmful LLM output:
Layer 1 — Game-context system prompts with per-game vocabulary whitelists.
Layer 2 — Real-time output filter (Llama Guard via Ollama, regex fallback).
Layer 3 — Per-game moderation profiles with configurable thresholds.
Usage:
from infrastructure.guards.moderation import get_moderator
moderator = get_moderator()
result = await moderator.check("Some narrator text", game="morrowind")
if result.blocked:
use_fallback_narration(result.fallback)
"""
import logging
import re
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from enum import Enum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
class ModerationVerdict(Enum):
"""Result of a moderation check."""
PASS = "pass"
FAIL = "fail"
ERROR = "error"
class ViolationCategory(Enum):
"""Categories of content violations."""
HATE_SPEECH = "hate_speech"
VIOLENCE_GLORIFICATION = "violence_glorification"
REAL_WORLD_HARM = "real_world_harm"
SEXUAL_CONTENT = "sexual_content"
SELF_HARM = "self_harm"
NONE = "none"
@dataclass
class ModerationResult:
"""Result from the moderation pipeline."""
verdict: ModerationVerdict
blocked: bool
category: ViolationCategory = ViolationCategory.NONE
confidence: float = 0.0
latency_ms: float = 0.0
layer: str = "" # Which layer caught it
fallback: str = "" # Contextual fallback narration
reason: str = ""
timestamp: str = field(default_factory=lambda: datetime.now(UTC).isoformat())
@property
def passed(self) -> bool:
return self.verdict == ModerationVerdict.PASS
@dataclass
class GameProfile:
"""Per-game moderation profile.
Defines vocabulary whitelists, theme framing rules,
and threshold tuning for a specific game.
"""
game_id: str
display_name: str
# Terms that are safe in game context (e.g., "Skooma", "slave", "Morag Tong")
vocabulary_whitelist: list[str] = field(default_factory=list)
# System prompt additions for game-context framing
context_prompt: str = ""
# Confidence threshold — below this, content passes even if flagged
threshold: float = 0.8
# Scene-type fallback narrations
fallbacks: dict[str, str] = field(default_factory=dict)
# ── Regex fallback patterns ─────────────────────────────────────────────────
# Used when Llama Guard is unavailable. Catches obvious violations only.
# Intentionally conservative — false negatives preferred over false positives.
_FALLBACK_PATTERNS: list[tuple[re.Pattern, ViolationCategory]] = [
(
re.compile(
r"\b(kill\s+(?:all|every)\s+(?:real|actual)\s+(?:people|humans|children))\b",
re.IGNORECASE,
),
ViolationCategory.REAL_WORLD_HARM,
),
(
re.compile(
r"\b((?:you|we)\s+should\s+(?:actually|really)\s+(?:harm|hurt|kill))\b",
re.IGNORECASE,
),
ViolationCategory.REAL_WORLD_HARM,
),
(
re.compile(
r"\b((?:in\s+real\s+life|irl)\s+(?:you|we)\s+(?:should|could|must)\s+"
r"(?:attack|hurt|kill|harm))\b",
re.IGNORECASE,
),
ViolationCategory.REAL_WORLD_HARM,
),
]
# ── Default game profiles ───────────────────────────────────────────────────
_DEFAULT_PROFILES: dict[str, GameProfile] = {
"morrowind": GameProfile(
game_id="morrowind",
display_name="The Elder Scrolls III: Morrowind",
vocabulary_whitelist=[
"Skooma",
"Moon Sugar",
"slave",
"slavery",
"Morag Tong",
"Dark Brotherhood",
"Telvanni",
"Camonna Tong",
"smuggler",
"assassin",
"Sixth House",
"Corprus",
"Dagoth Ur",
"Nerevarine",
],
context_prompt=(
"You are narrating gameplay of The Elder Scrolls III: Morrowind. "
"Morrowind contains mature themes including slavery, drug use (Skooma/Moon Sugar), "
"assassin guilds (Morag Tong, Dark Brotherhood), and political intrigue. "
"Treat these as game mechanics and historical worldbuilding within the game's "
"fictional universe. Never editorialize on real-world parallels. "
"Narrate events neutrally as a game commentator would."
),
threshold=0.85,
fallbacks={
"combat": "The battle rages on in the ashlands of Vvardenfell.",
"dialogue": "The conversation continues between the characters.",
"exploration": "The Nerevarine presses onward through the landscape.",
"default": "The adventure continues in Morrowind.",
},
),
"default": GameProfile(
game_id="default",
display_name="Generic Game",
vocabulary_whitelist=[],
context_prompt=(
"You are narrating gameplay. Describe in-game events as a neutral "
"game commentator. Never reference real-world violence, politics, "
"or controversial topics. Stay focused on game mechanics and story."
),
threshold=0.8,
fallbacks={
"combat": "The action continues on screen.",
"dialogue": "The conversation unfolds between characters.",
"exploration": "The player explores the game world.",
"default": "The gameplay continues.",
},
),
}
class ContentModerator:
"""Three-layer content moderation pipeline.
Layer 1: Game-context system prompts with vocabulary whitelists.
Layer 2: LLM-based moderation (Llama Guard via Ollama, with regex fallback).
Layer 3: Per-game threshold tuning and profile-based filtering.
Follows graceful degradation — if Llama Guard is unavailable,
falls back to regex patterns. Never crashes.
"""
def __init__(
self,
profiles: dict[str, GameProfile] | None = None,
guard_model: str | None = None,
) -> None:
self._profiles: dict[str, GameProfile] = profiles or dict(_DEFAULT_PROFILES)
self._guard_model = guard_model or settings.moderation_guard_model
self._guard_available: bool | None = None # Lazy-checked
self._metrics = _ModerationMetrics()
def get_profile(self, game: str) -> GameProfile:
"""Get the moderation profile for a game, falling back to default."""
return self._profiles.get(game, self._profiles["default"])
def register_profile(self, profile: GameProfile) -> None:
"""Register or update a game moderation profile."""
self._profiles[profile.game_id] = profile
logger.info("Registered moderation profile: %s", profile.game_id)
def get_context_prompt(self, game: str) -> str:
"""Get the game-context system prompt (Layer 1).
Returns the context prompt for the given game, which should be
prepended to the narrator's system prompt.
"""
profile = self.get_profile(game)
return profile.context_prompt
async def check(
self,
text: str,
game: str = "default",
scene_type: str = "default",
) -> ModerationResult:
"""Run the full moderation pipeline on narrator output.
Args:
text: The text to moderate (narrator output).
game: Game identifier for profile selection.
scene_type: Current scene type for fallback selection.
Returns:
ModerationResult with verdict, confidence, and fallback.
"""
start = time.monotonic()
profile = self.get_profile(game)
# Layer 1: Vocabulary whitelist pre-processing
cleaned_text = self._apply_whitelist(text, profile)
# Layer 2: LLM guard or regex fallback
result = await self._run_guard(cleaned_text, profile)
# Layer 3: Threshold tuning
if result.verdict == ModerationVerdict.FAIL and result.confidence < profile.threshold:
logger.info(
"Moderation flag below threshold (%.2f < %.2f) — allowing",
result.confidence,
profile.threshold,
)
result = ModerationResult(
verdict=ModerationVerdict.PASS,
blocked=False,
confidence=result.confidence,
layer="threshold",
reason=f"Below threshold ({result.confidence:.2f} < {profile.threshold:.2f})",
)
# Attach fallback narration if blocked
if result.blocked:
result.fallback = profile.fallbacks.get(
scene_type, profile.fallbacks.get("default", "")
)
result.latency_ms = (time.monotonic() - start) * 1000
self._metrics.record(result)
if result.blocked:
logger.warning(
"Content blocked [%s/%s]: category=%s confidence=%.2f reason=%s",
game,
scene_type,
result.category.value,
result.confidence,
result.reason,
)
return result
def _apply_whitelist(self, text: str, profile: GameProfile) -> str:
"""Layer 1: Replace whitelisted game terms with placeholders.
This prevents the guard model from flagging in-game terminology
(e.g., "Skooma" being flagged as drug reference).
"""
cleaned = text
for term in profile.vocabulary_whitelist:
# Case-insensitive replacement with a neutral placeholder
pattern = re.compile(re.escape(term), re.IGNORECASE)
cleaned = pattern.sub("[GAME_TERM]", cleaned)
return cleaned
async def _run_guard(
self, text: str, profile: GameProfile
) -> ModerationResult:
"""Layer 2: Run LLM guard model or fall back to regex."""
if not settings.moderation_enabled:
return ModerationResult(
verdict=ModerationVerdict.PASS,
blocked=False,
layer="disabled",
reason="Moderation disabled",
)
# Try Llama Guard via Ollama
if await self._is_guard_available():
try:
return await self._check_with_guard(text)
except Exception as exc:
logger.warning("Guard model failed, using regex fallback: %s", exc)
self._guard_available = False
# Regex fallback
return self._check_with_regex(text)
async def _is_guard_available(self) -> bool:
"""Check if the guard model is available via Ollama."""
if self._guard_available is not None:
return self._guard_available
try:
import aiohttp
url = f"{settings.normalized_ollama_url}/api/tags"
timeout = aiohttp.ClientTimeout(total=5)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.get(url) as resp:
if resp.status != 200:
self._guard_available = False
return False
data = await resp.json()
models = [m.get("name", "") for m in data.get("models", [])]
self._guard_available = any(
self._guard_model in m or m.startswith(self._guard_model)
for m in models
)
if not self._guard_available:
logger.info(
"Guard model '%s' not found in Ollama — using regex fallback",
self._guard_model,
)
return self._guard_available
except Exception as exc:
logger.debug("Ollama guard check failed: %s", exc)
self._guard_available = False
return False
async def _check_with_guard(self, text: str) -> ModerationResult:
"""Run moderation check via Llama Guard."""
import aiohttp
url = f"{settings.normalized_ollama_url}/api/chat"
payload = {
"model": self._guard_model,
"messages": [
{
"role": "user",
"content": text,
}
],
"stream": False,
"options": {"temperature": 0.0},
}
timeout = aiohttp.ClientTimeout(total=10)
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(url, json=payload) as resp:
if resp.status != 200:
raise RuntimeError(f"Guard API error: {resp.status}")
data = await resp.json()
response_text = data.get("message", {}).get("content", "").strip().lower()
# Llama Guard returns "safe" or "unsafe\n<category>"
if response_text.startswith("safe"):
return ModerationResult(
verdict=ModerationVerdict.PASS,
blocked=False,
confidence=0.0,
layer="llama_guard",
reason="Content safe",
)
# Parse unsafe response
category = ViolationCategory.NONE
confidence = 0.95 # High confidence from LLM guard
lines = response_text.split("\n")
if len(lines) > 1:
cat_str = lines[1].strip()
category = _parse_guard_category(cat_str)
return ModerationResult(
verdict=ModerationVerdict.FAIL,
blocked=True,
category=category,
confidence=confidence,
layer="llama_guard",
reason=f"Guard flagged: {response_text}",
)
def _check_with_regex(self, text: str) -> ModerationResult:
"""Regex fallback when guard model is unavailable.
Intentionally conservative — only catches obvious real-world harm.
"""
for pattern, category in _FALLBACK_PATTERNS:
match = pattern.search(text)
if match:
return ModerationResult(
verdict=ModerationVerdict.FAIL,
blocked=True,
category=category,
confidence=0.95, # Regex patterns are high-signal
layer="regex_fallback",
reason=f"Regex match: {match.group(0)[:50]}",
)
return ModerationResult(
verdict=ModerationVerdict.PASS,
blocked=False,
layer="regex_fallback",
reason="No regex matches",
)
def get_metrics(self) -> dict[str, Any]:
"""Get moderation pipeline metrics."""
return self._metrics.to_dict()
def reset_guard_cache(self) -> None:
"""Reset the guard availability cache (e.g., after pulling model)."""
self._guard_available = None
class _ModerationMetrics:
"""Tracks moderation pipeline performance."""
def __init__(self) -> None:
self.total_checks: int = 0
self.passed: int = 0
self.blocked: int = 0
self.errors: int = 0
self.total_latency_ms: float = 0.0
self.by_layer: dict[str, int] = {}
self.by_category: dict[str, int] = {}
def record(self, result: ModerationResult) -> None:
self.total_checks += 1
self.total_latency_ms += result.latency_ms
if result.verdict == ModerationVerdict.PASS:
self.passed += 1
elif result.verdict == ModerationVerdict.FAIL:
self.blocked += 1
else:
self.errors += 1
layer = result.layer or "unknown"
self.by_layer[layer] = self.by_layer.get(layer, 0) + 1
if result.blocked:
cat = result.category.value
self.by_category[cat] = self.by_category.get(cat, 0) + 1
def to_dict(self) -> dict[str, Any]:
return {
"total_checks": self.total_checks,
"passed": self.passed,
"blocked": self.blocked,
"errors": self.errors,
"avg_latency_ms": (
round(self.total_latency_ms / self.total_checks, 2)
if self.total_checks > 0
else 0.0
),
"by_layer": dict(self.by_layer),
"by_category": dict(self.by_category),
}
def _parse_guard_category(cat_str: str) -> ViolationCategory:
"""Parse Llama Guard category string to ViolationCategory."""
cat_lower = cat_str.lower()
if "hate" in cat_lower:
return ViolationCategory.HATE_SPEECH
if "violence" in cat_lower:
return ViolationCategory.VIOLENCE_GLORIFICATION
if "sexual" in cat_lower:
return ViolationCategory.SEXUAL_CONTENT
if "self-harm" in cat_lower or "self_harm" in cat_lower or "suicide" in cat_lower:
return ViolationCategory.SELF_HARM
if "harm" in cat_lower or "dangerous" in cat_lower:
return ViolationCategory.REAL_WORLD_HARM
return ViolationCategory.NONE
# ── Module-level singleton ──────────────────────────────────────────────────
_moderator: ContentModerator | None = None
def get_moderator() -> ContentModerator:
"""Get or create the content moderator singleton."""
global _moderator
if _moderator is None:
_moderator = ContentModerator()
return _moderator

View File

@@ -0,0 +1,56 @@
"""Load game moderation profiles from config/moderation.yaml.
Falls back to hardcoded defaults if the YAML file is missing or malformed.
"""
import logging
from pathlib import Path
from infrastructure.guards.moderation import GameProfile
logger = logging.getLogger(__name__)
def load_profiles(config_path: Path | None = None) -> dict[str, GameProfile]:
"""Load game moderation profiles from YAML config.
Args:
config_path: Path to moderation.yaml. Defaults to config/moderation.yaml.
Returns:
Dict mapping game_id to GameProfile.
"""
path = config_path or Path("config/moderation.yaml")
if not path.exists():
logger.info("Moderation config not found at %s — using defaults", path)
return {}
try:
import yaml
except ImportError:
logger.warning("PyYAML not installed — using default moderation profiles")
return {}
try:
data = yaml.safe_load(path.read_text())
except Exception as exc:
logger.error("Failed to parse moderation config: %s", exc)
return {}
profiles: dict[str, GameProfile] = {}
for game_id, profile_data in data.get("profiles", {}).items():
try:
profiles[game_id] = GameProfile(
game_id=game_id,
display_name=profile_data.get("display_name", game_id),
vocabulary_whitelist=profile_data.get("vocabulary_whitelist", []),
context_prompt=profile_data.get("context_prompt", ""),
threshold=float(profile_data.get("threshold", 0.8)),
fallbacks=profile_data.get("fallbacks", {}),
)
except Exception as exc:
logger.warning("Invalid profile '%s': %s", game_id, exc)
logger.info("Loaded %d moderation profiles from %s", len(profiles), path)
return profiles

View File

@@ -0,0 +1,335 @@
"""Tests for the content moderation pipeline."""
from unittest.mock import AsyncMock, patch
import pytest
from infrastructure.guards.moderation import (
ContentModerator,
GameProfile,
ModerationResult,
ModerationVerdict,
ViolationCategory,
_parse_guard_category,
get_moderator,
)
# ── Unit tests for data types ────────────────────────────────────────────────
class TestModerationResult:
"""Test ModerationResult dataclass."""
def test_passed_property_true(self):
result = ModerationResult(verdict=ModerationVerdict.PASS, blocked=False)
assert result.passed is True
def test_passed_property_false(self):
result = ModerationResult(verdict=ModerationVerdict.FAIL, blocked=True)
assert result.passed is False
def test_default_values(self):
result = ModerationResult(verdict=ModerationVerdict.PASS, blocked=False)
assert result.category == ViolationCategory.NONE
assert result.confidence == 0.0
assert result.fallback == ""
assert result.reason == ""
class TestGameProfile:
"""Test GameProfile dataclass."""
def test_default_values(self):
profile = GameProfile(game_id="test", display_name="Test Game")
assert profile.vocabulary_whitelist == []
assert profile.threshold == 0.8
assert profile.fallbacks == {}
def test_morrowind_profile(self):
profile = GameProfile(
game_id="morrowind",
display_name="Morrowind",
vocabulary_whitelist=["Skooma", "slave"],
threshold=0.85,
)
assert "Skooma" in profile.vocabulary_whitelist
assert profile.threshold == 0.85
class TestParseGuardCategory:
"""Test Llama Guard category parsing."""
def test_hate_speech(self):
assert _parse_guard_category("S1: Hate speech") == ViolationCategory.HATE_SPEECH
def test_violence(self):
assert _parse_guard_category("S2: Violence") == ViolationCategory.VIOLENCE_GLORIFICATION
def test_sexual_content(self):
assert _parse_guard_category("S3: Sexual content") == ViolationCategory.SEXUAL_CONTENT
def test_self_harm(self):
assert _parse_guard_category("S4: Self-harm") == ViolationCategory.SELF_HARM
def test_dangerous(self):
assert _parse_guard_category("S5: Dangerous activity") == ViolationCategory.REAL_WORLD_HARM
def test_unknown_category(self):
assert _parse_guard_category("S99: Unknown") == ViolationCategory.NONE
# ── ContentModerator tests ───────────────────────────────────────────────────
class TestContentModerator:
"""Test the content moderation pipeline."""
def _make_moderator(self, **kwargs) -> ContentModerator:
"""Create a moderator with test defaults."""
profiles = {
"morrowind": GameProfile(
game_id="morrowind",
display_name="Morrowind",
vocabulary_whitelist=["Skooma", "Moon Sugar", "slave", "Morag Tong"],
context_prompt="Narrate Morrowind gameplay.",
threshold=0.85,
fallbacks={
"combat": "The battle continues.",
"default": "The adventure continues.",
},
),
"default": GameProfile(
game_id="default",
display_name="Generic",
vocabulary_whitelist=[],
context_prompt="Narrate gameplay.",
threshold=0.8,
fallbacks={"default": "Gameplay continues."},
),
}
return ContentModerator(profiles=profiles, **kwargs)
def test_get_profile_known_game(self):
mod = self._make_moderator()
profile = mod.get_profile("morrowind")
assert profile.game_id == "morrowind"
def test_get_profile_unknown_game_falls_back(self):
mod = self._make_moderator()
profile = mod.get_profile("unknown_game")
assert profile.game_id == "default"
def test_get_context_prompt(self):
mod = self._make_moderator()
prompt = mod.get_context_prompt("morrowind")
assert "Morrowind" in prompt
def test_register_profile(self):
mod = self._make_moderator()
new_profile = GameProfile(game_id="skyrim", display_name="Skyrim")
mod.register_profile(new_profile)
assert mod.get_profile("skyrim").game_id == "skyrim"
def test_whitelist_replaces_game_terms(self):
mod = self._make_moderator()
profile = mod.get_profile("morrowind")
cleaned = mod._apply_whitelist(
"The merchant sells Skooma and Moon Sugar in the slave market.",
profile,
)
assert "Skooma" not in cleaned
assert "Moon Sugar" not in cleaned
assert "slave" not in cleaned
assert "[GAME_TERM]" in cleaned
def test_whitelist_case_insensitive(self):
mod = self._make_moderator()
profile = mod.get_profile("morrowind")
cleaned = mod._apply_whitelist("skooma and SKOOMA", profile)
assert "skooma" not in cleaned
assert "SKOOMA" not in cleaned
@pytest.mark.asyncio
async def test_check_safe_content_passes(self):
"""Safe content should pass moderation."""
mod = self._make_moderator()
with patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=False):
result = await mod.check("The player walks through the town.", game="morrowind")
assert result.passed
assert not result.blocked
@pytest.mark.asyncio
async def test_check_blocked_content_has_fallback(self):
"""Blocked content should include scene-appropriate fallback."""
mod = self._make_moderator()
# Force a block via regex by using real-world harm language
text = "In real life you should attack and hurt people"
with patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=False):
result = await mod.check(text, game="morrowind", scene_type="combat")
assert result.blocked
assert result.fallback == "The battle continues."
@pytest.mark.asyncio
async def test_check_with_moderation_disabled(self):
"""When moderation is disabled, everything passes."""
mod = self._make_moderator()
with patch("infrastructure.guards.moderation.settings") as mock_settings:
mock_settings.moderation_enabled = False
mock_settings.moderation_guard_model = "llama-guard3:1b"
mock_settings.normalized_ollama_url = "http://127.0.0.1:11434"
result = await mod.check("anything goes here")
assert result.passed
assert result.layer == "disabled"
@pytest.mark.asyncio
async def test_threshold_below_allows_content(self):
"""Content flagged below threshold should pass through (Layer 3)."""
mod = self._make_moderator()
# Mock the guard to return a low-confidence flag
low_conf_result = ModerationResult(
verdict=ModerationVerdict.FAIL,
blocked=True,
confidence=0.5, # Below morrowind threshold of 0.85
layer="llama_guard",
category=ViolationCategory.VIOLENCE_GLORIFICATION,
)
with patch.object(
mod, "_run_guard", new_callable=AsyncMock, return_value=low_conf_result
):
result = await mod.check("sword fight scene", game="morrowind")
assert result.passed
assert not result.blocked
assert result.layer == "threshold"
@pytest.mark.asyncio
async def test_threshold_above_blocks_content(self):
"""Content flagged above threshold should remain blocked."""
mod = self._make_moderator()
high_conf_result = ModerationResult(
verdict=ModerationVerdict.FAIL,
blocked=True,
confidence=0.95, # Above morrowind threshold of 0.85
layer="llama_guard",
category=ViolationCategory.REAL_WORLD_HARM,
)
with patch.object(
mod, "_run_guard", new_callable=AsyncMock, return_value=high_conf_result
):
result = await mod.check("harmful content", game="morrowind")
assert result.blocked
def test_regex_catches_real_world_harm(self):
"""Regex fallback should catch obvious real-world harm patterns."""
mod = self._make_moderator()
result = mod._check_with_regex("you should actually harm real people")
assert result.blocked
assert result.category == ViolationCategory.REAL_WORLD_HARM
assert result.layer == "regex_fallback"
def test_regex_passes_game_violence(self):
"""Regex should not flag in-game violence narration."""
mod = self._make_moderator()
result = mod._check_with_regex(
"The warrior slays the dragon with a mighty blow."
)
assert result.passed
def test_regex_passes_normal_narration(self):
"""Normal narration should pass regex checks."""
mod = self._make_moderator()
result = mod._check_with_regex(
"The Nerevarine enters the city of Balmora and speaks with Caius Cosades."
)
assert result.passed
def test_metrics_tracking(self):
"""Metrics should track checks accurately."""
mod = self._make_moderator()
assert mod.get_metrics()["total_checks"] == 0
@pytest.mark.asyncio
async def test_metrics_increment_after_check(self):
"""Metrics should increment after moderation checks."""
mod = self._make_moderator()
with patch.object(mod, "_is_guard_available", new_callable=AsyncMock, return_value=False):
await mod.check("safe text", game="default")
metrics = mod.get_metrics()
assert metrics["total_checks"] == 1
assert metrics["passed"] == 1
@pytest.mark.asyncio
async def test_guard_fallback_on_error(self):
"""Should fall back to regex when guard model errors."""
mod = self._make_moderator()
with patch.object(
mod, "_is_guard_available", new_callable=AsyncMock, return_value=True
), patch.object(
mod, "_check_with_guard", new_callable=AsyncMock, side_effect=RuntimeError("timeout")
):
result = await mod.check("safe text", game="default")
# Should fall back to regex and pass
assert result.passed
assert result.layer == "regex_fallback"
class TestGetModerator:
"""Test the singleton accessor."""
def test_returns_same_instance(self):
"""get_moderator should return the same instance."""
# Reset the global to test fresh
import infrastructure.guards.moderation as mod_module
mod_module._moderator = None
m1 = get_moderator()
m2 = get_moderator()
assert m1 is m2
# Clean up
mod_module._moderator = None
# ── Profile loader tests ────────────────────────────────────────────────────
class TestProfileLoader:
"""Test YAML profile loading."""
def test_load_missing_file_returns_empty(self, tmp_path):
from infrastructure.guards.profiles import load_profiles
result = load_profiles(tmp_path / "nonexistent.yaml")
assert result == {}
def test_load_valid_config(self, tmp_path):
import yaml
from infrastructure.guards.profiles import load_profiles
config = {
"profiles": {
"testgame": {
"display_name": "Test Game",
"threshold": 0.9,
"vocabulary_whitelist": ["sword", "potion"],
"context_prompt": "Narrate test game.",
"fallbacks": {"default": "Game continues."},
}
}
}
config_file = tmp_path / "moderation.yaml"
config_file.write_text(yaml.dump(config))
profiles = load_profiles(config_file)
assert "testgame" in profiles
assert profiles["testgame"].threshold == 0.9
assert "sword" in profiles["testgame"].vocabulary_whitelist
def test_load_malformed_yaml_returns_empty(self, tmp_path):
from infrastructure.guards.profiles import load_profiles
config_file = tmp_path / "moderation.yaml"
config_file.write_text("{{{{invalid yaml")
result = load_profiles(config_file)
assert result == {}