Compare commits
2 Commits
fix/922
...
fix/891-pr
| Author | SHA1 | Date | |
|---|---|---|---|
| a0ed1e6ff2 | |||
| b5ba272efe |
262
agent/profile_isolation.py
Normal file
262
agent/profile_isolation.py
Normal file
@@ -0,0 +1,262 @@
|
||||
"""
|
||||
Profile Session Isolation — #891
|
||||
|
||||
Tags sessions with their originating profile and provides
|
||||
filtered access so profiles cannot see each other's data.
|
||||
|
||||
Current state: All sessions share one state.db with no profile tag.
|
||||
This module adds profile tagging and filtered queries.
|
||||
|
||||
Usage:
|
||||
from agent.profile_isolation import tag_session, get_profile_sessions, get_active_profile
|
||||
|
||||
# Tag a new session with the current profile
|
||||
tag_session(session_id, profile_name)
|
||||
|
||||
# Get sessions for a specific profile
|
||||
sessions = get_profile_sessions("sprint")
|
||||
|
||||
# Get current active profile
|
||||
profile = get_active_profile()
|
||||
"""
|
||||
|
||||
import json
|
||||
import os
|
||||
import sqlite3
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional
|
||||
from datetime import datetime, timezone
|
||||
|
||||
HERMES_HOME = Path(os.getenv("HERMES_HOME", str(Path.home() / ".hermes")))
|
||||
SESSIONS_DB = HERMES_HOME / "sessions" / "state.db"
|
||||
PROFILE_TAGS_FILE = HERMES_HOME / "profile_session_tags.json"
|
||||
|
||||
|
||||
def get_active_profile() -> str:
|
||||
"""Get the currently active profile name."""
|
||||
config_path = HERMES_HOME / "config.yaml"
|
||||
if config_path.exists():
|
||||
try:
|
||||
import yaml
|
||||
with open(config_path) as f:
|
||||
cfg = yaml.safe_load(f) or {}
|
||||
return cfg.get("active_profile", "default")
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
# Check environment
|
||||
return os.getenv("HERMES_PROFILE", "default")
|
||||
|
||||
|
||||
def _load_tags() -> Dict[str, str]:
|
||||
"""Load session-to-profile mapping."""
|
||||
if not PROFILE_TAGS_FILE.exists():
|
||||
return {}
|
||||
try:
|
||||
with open(PROFILE_TAGS_FILE) as f:
|
||||
return json.load(f)
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
|
||||
def _save_tags(tags: Dict[str, str]):
|
||||
"""Save session-to-profile mapping."""
|
||||
PROFILE_TAGS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
with open(PROFILE_TAGS_FILE, "w") as f:
|
||||
json.dump(tags, f, indent=2)
|
||||
|
||||
|
||||
def tag_session(session_id: str, profile: Optional[str] = None) -> str:
|
||||
"""
|
||||
Tag a session with its originating profile.
|
||||
|
||||
Returns the profile name used.
|
||||
"""
|
||||
if profile is None:
|
||||
profile = get_active_profile()
|
||||
|
||||
tags = _load_tags()
|
||||
tags[session_id] = profile
|
||||
_save_tags(tags)
|
||||
|
||||
# Also tag in SQLite if available
|
||||
_tag_session_in_db(session_id, profile)
|
||||
|
||||
return profile
|
||||
|
||||
|
||||
def _tag_session_in_db(session_id: str, profile: str):
|
||||
"""Add profile tag to SQLite session store."""
|
||||
if not SESSIONS_DB.exists():
|
||||
return
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(SESSIONS_DB))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Check if sessions table has profile column
|
||||
cursor.execute("PRAGMA table_info(sessions)")
|
||||
columns = [row[1] for row in cursor.fetchall()]
|
||||
|
||||
if "profile" not in columns:
|
||||
# Add profile column
|
||||
cursor.execute("ALTER TABLE sessions ADD COLUMN profile TEXT DEFAULT 'default'")
|
||||
|
||||
# Update the session's profile
|
||||
cursor.execute(
|
||||
"UPDATE sessions SET profile = ? WHERE session_id = ?",
|
||||
(profile, session_id)
|
||||
)
|
||||
|
||||
conn.commit()
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass # SQLite might not be available or schema differs
|
||||
|
||||
|
||||
def get_session_profile(session_id: str) -> Optional[str]:
|
||||
"""Get the profile that owns a session."""
|
||||
# Check JSON tags first
|
||||
tags = _load_tags()
|
||||
if session_id in tags:
|
||||
return tags[session_id]
|
||||
|
||||
# Check SQLite
|
||||
if SESSIONS_DB.exists():
|
||||
try:
|
||||
conn = sqlite3.connect(str(SESSIONS_DB))
|
||||
cursor = conn.cursor()
|
||||
cursor.execute(
|
||||
"SELECT profile FROM sessions WHERE session_id = ?",
|
||||
(session_id,)
|
||||
)
|
||||
row = cursor.fetchone()
|
||||
conn.close()
|
||||
if row:
|
||||
return row[0]
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return None
|
||||
|
||||
|
||||
def get_profile_sessions(
|
||||
profile: Optional[str] = None,
|
||||
limit: int = 100,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Get sessions belonging to a specific profile.
|
||||
|
||||
Returns list of session dicts.
|
||||
"""
|
||||
if profile is None:
|
||||
profile = get_active_profile()
|
||||
|
||||
sessions = []
|
||||
|
||||
# Get from JSON tags
|
||||
tags = _load_tags()
|
||||
tagged_sessions = [sid for sid, p in tags.items() if p == profile]
|
||||
|
||||
# Get from SQLite with profile filter
|
||||
if SESSIONS_DB.exists():
|
||||
try:
|
||||
conn = sqlite3.connect(str(SESSIONS_DB))
|
||||
conn.row_factory = sqlite3.Row
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Try profile column first
|
||||
try:
|
||||
cursor.execute(
|
||||
"SELECT * FROM sessions WHERE profile = ? ORDER BY updated_at DESC LIMIT ?",
|
||||
(profile, limit)
|
||||
)
|
||||
for row in cursor.fetchall():
|
||||
sessions.append(dict(row))
|
||||
except Exception:
|
||||
# Fallback: filter by tagged session IDs
|
||||
if tagged_sessions:
|
||||
placeholders = ",".join("?" * len(tagged_sessions[:limit]))
|
||||
cursor.execute(
|
||||
f"SELECT * FROM sessions WHERE session_id IN ({placeholders}) ORDER BY updated_at DESC LIMIT ?",
|
||||
(*tagged_sessions[:limit], limit)
|
||||
)
|
||||
for row in cursor.fetchall():
|
||||
sessions.append(dict(row))
|
||||
|
||||
conn.close()
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
return sessions[:limit]
|
||||
|
||||
|
||||
def filter_sessions_by_profile(
|
||||
sessions: List[Dict[str, Any]],
|
||||
profile: Optional[str] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""Filter a list of sessions to only include those belonging to a profile."""
|
||||
if profile is None:
|
||||
profile = get_active_profile()
|
||||
|
||||
tags = _load_tags()
|
||||
filtered = []
|
||||
|
||||
for session in sessions:
|
||||
sid = session.get("session_id") or session.get("id")
|
||||
if not sid:
|
||||
continue
|
||||
|
||||
# Check tag
|
||||
session_profile = tags.get(sid)
|
||||
if session_profile is None:
|
||||
# Check SQLite
|
||||
session_profile = get_session_profile(sid)
|
||||
|
||||
if session_profile == profile or session_profile is None:
|
||||
filtered.append(session)
|
||||
|
||||
return filtered
|
||||
|
||||
|
||||
def get_profile_stats() -> Dict[str, Any]:
|
||||
"""Get statistics about profile session distribution."""
|
||||
tags = _load_tags()
|
||||
|
||||
profile_counts = {}
|
||||
for sid, profile in tags.items():
|
||||
profile_counts[profile] = profile_counts.get(profile, 0) + 1
|
||||
|
||||
total_tagged = len(tags)
|
||||
profiles = list(profile_counts.keys())
|
||||
|
||||
return {
|
||||
"total_tagged_sessions": total_tagged,
|
||||
"profiles": profiles,
|
||||
"profile_counts": profile_counts,
|
||||
"active_profile": get_active_profile(),
|
||||
}
|
||||
|
||||
|
||||
def audit_untagged_sessions() -> List[str]:
|
||||
"""Find sessions without a profile tag."""
|
||||
if not SESSIONS_DB.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
conn = sqlite3.connect(str(SESSIONS_DB))
|
||||
cursor = conn.cursor()
|
||||
|
||||
# Get all session IDs
|
||||
cursor.execute("SELECT session_id FROM sessions")
|
||||
all_sessions = {row[0] for row in cursor.fetchall()}
|
||||
conn.close()
|
||||
|
||||
# Get tagged sessions
|
||||
tags = _load_tags()
|
||||
tagged = set(tags.keys())
|
||||
|
||||
# Return untagged
|
||||
return list(all_sessions - tagged)
|
||||
except Exception:
|
||||
return []
|
||||
76
tests/test_profile_isolation.py
Normal file
76
tests/test_profile_isolation.py
Normal file
@@ -0,0 +1,76 @@
|
||||
"""Tests for profile session isolation (#891)."""
|
||||
|
||||
import sys
|
||||
import json
|
||||
import tempfile
|
||||
from pathlib import Path
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).parent.parent))
|
||||
|
||||
# Override paths for testing
|
||||
import agent.profile_isolation as iso_mod
|
||||
_test_dir = Path(tempfile.mkdtemp())
|
||||
iso_mod.PROFILE_TAGS_FILE = _test_dir / "tags.json"
|
||||
|
||||
|
||||
def test_tag_session():
|
||||
"""Session gets tagged with profile."""
|
||||
profile = iso_mod.tag_session("sess-1", "sprint")
|
||||
assert profile == "sprint"
|
||||
assert iso_mod.get_session_profile("sess-1") == "sprint"
|
||||
|
||||
|
||||
def test_default_profile():
|
||||
"""Sessions tagged with default when no profile specified."""
|
||||
profile = iso_mod.tag_session("sess-2")
|
||||
assert profile is not None
|
||||
|
||||
|
||||
def test_get_session_profile():
|
||||
"""Can retrieve profile for tagged session."""
|
||||
iso_mod.tag_session("sess-3", "fenrir")
|
||||
assert iso_mod.get_session_profile("sess-3") == "fenrir"
|
||||
|
||||
|
||||
def test_untagged_returns_none():
|
||||
"""Untagged session returns None."""
|
||||
assert iso_mod.get_session_profile("nonexistent") is None
|
||||
|
||||
|
||||
def test_profile_stats():
|
||||
"""Stats reflect tagged sessions."""
|
||||
iso_mod.tag_session("s1", "default")
|
||||
iso_mod.tag_session("s2", "sprint")
|
||||
iso_mod.tag_session("s3", "sprint")
|
||||
stats = iso_mod.get_profile_stats()
|
||||
assert stats["total_tagged_sessions"] >= 3
|
||||
assert "sprint" in stats["profile_counts"]
|
||||
|
||||
|
||||
def test_filter_sessions():
|
||||
"""Filter returns only matching profile sessions."""
|
||||
iso_mod.tag_session("filter-1", "alpha")
|
||||
iso_mod.tag_session("filter-2", "beta")
|
||||
iso_mod.tag_session("filter-3", "alpha")
|
||||
|
||||
sessions = [
|
||||
{"session_id": "filter-1"},
|
||||
{"session_id": "filter-2"},
|
||||
{"session_id": "filter-3"},
|
||||
]
|
||||
|
||||
filtered = iso_mod.filter_sessions_by_profile(sessions, "alpha")
|
||||
ids = [s["session_id"] for s in filtered]
|
||||
assert "filter-1" in ids
|
||||
assert "filter-3" in ids
|
||||
assert "filter-2" not in ids
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
tests = [test_tag_session, test_default_profile, test_get_session_profile,
|
||||
test_untagged_returns_none, test_profile_stats, test_filter_sessions]
|
||||
for t in tests:
|
||||
print(f"Running {t.__name__}...")
|
||||
t()
|
||||
print(" PASS")
|
||||
print("\nAll tests passed.")
|
||||
@@ -1,67 +0,0 @@
|
||||
"""
|
||||
Tests for tool hallucination detection (#922).
|
||||
"""
|
||||
|
||||
import pytest
|
||||
from tools.tool_validator import ToolHallucinationDetector, ValidationSeverity
|
||||
|
||||
|
||||
class TestToolHallucinationDetector:
|
||||
def setup_method(self):
|
||||
self.detector = ToolHallucinationDetector()
|
||||
self.detector.register_tool("read_file", {
|
||||
"parameters": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"path": {"type": "string"},
|
||||
"encoding": {"type": "string"},
|
||||
},
|
||||
"required": ["path"]
|
||||
}
|
||||
})
|
||||
|
||||
def test_valid_tool_call(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt"})
|
||||
assert result.valid is True
|
||||
assert len(result.blocking_issues) == 0
|
||||
|
||||
def test_unknown_tool(self):
|
||||
result = self.detector.validate_tool_call("hallucinated_tool", {})
|
||||
assert result.valid is False
|
||||
assert any(i.code == "UNKNOWN_TOOL" for i in result.issues)
|
||||
|
||||
def test_missing_required_param(self):
|
||||
result = self.detector.validate_tool_call("read_file", {})
|
||||
assert result.valid is False
|
||||
assert any(i.code == "MISSING_REQUIRED" for i in result.issues)
|
||||
|
||||
def test_wrong_type(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": 123})
|
||||
assert result.valid is False
|
||||
assert any(i.code == "WRONG_TYPE" for i in result.issues)
|
||||
|
||||
def test_unknown_param_warning(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt", "unknown": "value"})
|
||||
assert result.valid is True # Warning, not blocking
|
||||
assert any(i.code == "UNKNOWN_PARAM" for i in result.issues)
|
||||
|
||||
def test_placeholder_detection(self):
|
||||
result = self.detector.validate_tool_call("read_file", {"path": "<placeholder>"})
|
||||
assert any(i.code == "PLACEHOLDER_VALUE" for i in result.issues)
|
||||
|
||||
def test_rejection_stats(self):
|
||||
self.detector.validate_tool_call("unknown_tool", {})
|
||||
self.detector.validate_tool_call("read_file", {})
|
||||
stats = self.detector.get_rejection_stats()
|
||||
assert stats["total"] >= 2
|
||||
|
||||
def test_rejection_response(self):
|
||||
from tools.tool_validator import create_rejection_response
|
||||
result = self.detector.validate_tool_call("unknown_tool", {})
|
||||
response = create_rejection_response(result)
|
||||
assert response["role"] == "tool"
|
||||
assert "rejected" in response["content"].lower()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
pytest.main([__file__])
|
||||
@@ -44,34 +44,6 @@ from typing import Dict, Any, Optional, Tuple
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _format_error(
|
||||
message: str,
|
||||
skill_name: str = None,
|
||||
file_path: str = None,
|
||||
suggestion: str = None,
|
||||
context: dict = None,
|
||||
) -> Dict[str, Any]:
|
||||
"""Format an error with rich context for better debugging."""
|
||||
parts = [message]
|
||||
if skill_name:
|
||||
parts.append(f"Skill: {skill_name}")
|
||||
if file_path:
|
||||
parts.append(f"File: {file_path}")
|
||||
if suggestion:
|
||||
parts.append(f"Suggestion: {suggestion}")
|
||||
if context:
|
||||
for key, value in context.items():
|
||||
parts.append(f"{key}: {value}")
|
||||
return {
|
||||
"success": False,
|
||||
"error": " | ".join(parts),
|
||||
"skill_name": skill_name,
|
||||
"file_path": file_path,
|
||||
"suggestion": suggestion,
|
||||
}
|
||||
|
||||
|
||||
# Import security scanner — agent-created skills get the same scrutiny as
|
||||
# community hub installs.
|
||||
try:
|
||||
|
||||
@@ -1,312 +0,0 @@
|
||||
"""
|
||||
Poka-Yoke: Tool Hallucination Detection — #922.
|
||||
|
||||
Validation firewall between LLM tool-call output and actual execution.
|
||||
|
||||
Detects and blocks:
|
||||
1. Unknown tool names (hallucinated tools)
|
||||
2. Malformed parameters (wrong types)
|
||||
3. Missing required arguments
|
||||
4. Extra unknown parameters
|
||||
|
||||
Poka-Yoke Type: Detection (catches errors at the boundary before harm)
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, List, Optional, Set
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ValidationSeverity(Enum):
|
||||
"""Severity of validation failure."""
|
||||
BLOCK = "block" # Must block execution
|
||||
WARN = "warn" # Warning, may proceed
|
||||
INFO = "info" # Informational
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationIssue:
|
||||
"""A validation issue found."""
|
||||
severity: ValidationSeverity
|
||||
code: str
|
||||
message: str
|
||||
tool_name: str
|
||||
parameter: Optional[str] = None
|
||||
expected: Optional[str] = None
|
||||
actual: Optional[Any] = None
|
||||
|
||||
|
||||
@dataclass
|
||||
class ValidationResult:
|
||||
"""Result of tool call validation."""
|
||||
valid: bool
|
||||
tool_name: str
|
||||
issues: List[ValidationIssue] = field(default_factory=list)
|
||||
corrected_args: Optional[Dict[str, Any]] = None
|
||||
|
||||
@property
|
||||
def blocking_issues(self) -> List[ValidationIssue]:
|
||||
return [i for i in self.issues if i.severity == ValidationSeverity.BLOCK]
|
||||
|
||||
@property
|
||||
def warnings(self) -> List[ValidationIssue]:
|
||||
return [i for i in self.issues if i.severity == ValidationSeverity.WARN]
|
||||
|
||||
|
||||
class ToolHallucinationDetector:
|
||||
"""
|
||||
Poka-yoke detector for tool hallucinations.
|
||||
|
||||
Validates tool calls against registered schemas before execution.
|
||||
"""
|
||||
|
||||
def __init__(self, tool_registry: Optional[Dict] = None):
|
||||
"""
|
||||
Initialize detector.
|
||||
|
||||
Args:
|
||||
tool_registry: Dict of tool_name -> tool_schema
|
||||
"""
|
||||
self.registry = tool_registry or {}
|
||||
self._rejection_log: List[Dict] = []
|
||||
|
||||
def register_tool(self, name: str, schema: Dict):
|
||||
"""Register a tool with its JSON Schema."""
|
||||
self.registry[name] = schema
|
||||
|
||||
def register_tools(self, tools: Dict[str, Dict]):
|
||||
"""Register multiple tools."""
|
||||
self.registry.update(tools)
|
||||
|
||||
def validate_tool_call(
|
||||
self,
|
||||
tool_name: str,
|
||||
arguments: Dict[str, Any],
|
||||
model: str = "unknown",
|
||||
) -> ValidationResult:
|
||||
"""
|
||||
Validate a tool call against the registry.
|
||||
|
||||
Args:
|
||||
tool_name: Name of the tool being called
|
||||
arguments: Arguments passed to the tool
|
||||
model: Model that generated the call (for logging)
|
||||
|
||||
Returns:
|
||||
ValidationResult with validation status
|
||||
"""
|
||||
issues = []
|
||||
|
||||
# 1. Check if tool exists
|
||||
if tool_name not in self.registry:
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.BLOCK,
|
||||
code="UNKNOWN_TOOL",
|
||||
message=f"Tool '{tool_name}' does not exist. Available: {', '.join(sorted(self.registry.keys())[:10])}...",
|
||||
tool_name=tool_name,
|
||||
)
|
||||
issues.append(issue)
|
||||
self._log_rejection(tool_name, arguments, model, "UNKNOWN_TOOL")
|
||||
return ValidationResult(valid=False, tool_name=tool_name, issues=issues)
|
||||
|
||||
schema = self.registry[tool_name]
|
||||
params_schema = schema.get("parameters", {}).get("properties", {})
|
||||
required = set(schema.get("parameters", {}).get("required", []))
|
||||
|
||||
# 2. Check for missing required parameters
|
||||
for param in required:
|
||||
if param not in arguments:
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.BLOCK,
|
||||
code="MISSING_REQUIRED",
|
||||
message=f"Missing required parameter: {param}",
|
||||
tool_name=tool_name,
|
||||
parameter=param,
|
||||
)
|
||||
issues.append(issue)
|
||||
|
||||
# 3. Check parameter types
|
||||
for param_name, param_value in arguments.items():
|
||||
if param_name not in params_schema:
|
||||
# Unknown parameter
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.WARN,
|
||||
code="UNKNOWN_PARAM",
|
||||
message=f"Unknown parameter: {param_name}",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
)
|
||||
issues.append(issue)
|
||||
continue
|
||||
|
||||
param_schema = params_schema[param_name]
|
||||
expected_type = param_schema.get("type")
|
||||
|
||||
if expected_type and not self._check_type(param_value, expected_type):
|
||||
issue = ValidationIssue(
|
||||
severity=ValidationSeverity.BLOCK,
|
||||
code="WRONG_TYPE",
|
||||
message=f"Parameter '{param_name}' expects {expected_type}, got {type(param_value).__name__}",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
expected=expected_type,
|
||||
actual=type(param_value).__name__,
|
||||
)
|
||||
issues.append(issue)
|
||||
|
||||
# 4. Check for common hallucination patterns
|
||||
hallucination_issues = self._detect_hallucination_patterns(tool_name, arguments)
|
||||
issues.extend(hallucination_issues)
|
||||
|
||||
# Determine validity
|
||||
has_blocking = any(i.severity == ValidationSeverity.BLOCK for i in issues)
|
||||
|
||||
if has_blocking:
|
||||
self._log_rejection(tool_name, arguments, model,
|
||||
"; ".join(i.code for i in issues if i.severity == ValidationSeverity.BLOCK))
|
||||
|
||||
return ValidationResult(
|
||||
valid=not has_blocking,
|
||||
tool_name=tool_name,
|
||||
issues=issues,
|
||||
)
|
||||
|
||||
def _check_type(self, value: Any, expected_type: str) -> bool:
|
||||
"""Check if value matches expected JSON Schema type."""
|
||||
type_map = {
|
||||
"string": str,
|
||||
"number": (int, float),
|
||||
"integer": int,
|
||||
"boolean": bool,
|
||||
"array": list,
|
||||
"object": dict,
|
||||
}
|
||||
|
||||
expected = type_map.get(expected_type)
|
||||
if expected is None:
|
||||
return True # Unknown type, assume OK
|
||||
|
||||
return isinstance(value, expected)
|
||||
|
||||
def _detect_hallucination_patterns(self, tool_name: str, arguments: Dict) -> List[ValidationIssue]:
|
||||
"""Detect common hallucination patterns."""
|
||||
issues = []
|
||||
|
||||
# Pattern 1: Placeholder values
|
||||
placeholder_patterns = [
|
||||
r"^<.*>$", # <placeholder>
|
||||
r"^\[.*\]$", # [placeholder]
|
||||
r"^TODO$|^FIXME$", # TODO/FIXME
|
||||
r"^example\.com$", # example.com
|
||||
r"^127\.0\.0\.1$", # localhost
|
||||
]
|
||||
|
||||
for param_name, param_value in arguments.items():
|
||||
if isinstance(param_value, str):
|
||||
for pattern in placeholder_patterns:
|
||||
if re.match(pattern, param_value, re.IGNORECASE):
|
||||
issues.append(ValidationIssue(
|
||||
severity=ValidationSeverity.WARN,
|
||||
code="PLACEHOLDER_VALUE",
|
||||
message=f"Parameter '{param_name}' contains placeholder: {param_value}",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
))
|
||||
|
||||
# Pattern 2: Suspiciously long strings (might be hallucinated content)
|
||||
for param_name, param_value in arguments.items():
|
||||
if isinstance(param_value, str) and len(param_value) > 10000:
|
||||
issues.append(ValidationIssue(
|
||||
severity=ValidationSeverity.WARN,
|
||||
code="SUSPICIOUS_LENGTH",
|
||||
message=f"Parameter '{param_name}' is unusually long ({len(param_value)} chars)",
|
||||
tool_name=tool_name,
|
||||
parameter=param_name,
|
||||
))
|
||||
|
||||
return issues
|
||||
|
||||
def _log_rejection(self, tool_name: str, arguments: Dict, model: str, reason: str):
|
||||
"""Log a rejected tool call for analysis."""
|
||||
import time
|
||||
|
||||
entry = {
|
||||
"timestamp": time.time(),
|
||||
"tool_name": tool_name,
|
||||
"arguments": {k: str(v)[:100] for k, v in arguments.items()},
|
||||
"model": model,
|
||||
"reason": reason,
|
||||
}
|
||||
|
||||
self._rejection_log.append(entry)
|
||||
|
||||
# Keep log bounded
|
||||
if len(self._rejection_log) > 1000:
|
||||
self._rejection_log = self._rejection_log[-500:]
|
||||
|
||||
logger.warning(
|
||||
"Tool hallucination blocked: tool=%s, model=%s, reason=%s",
|
||||
tool_name, model, reason
|
||||
)
|
||||
|
||||
def get_rejection_stats(self) -> Dict:
|
||||
"""Get statistics on rejected tool calls."""
|
||||
if not self._rejection_log:
|
||||
return {"total": 0, "by_reason": {}, "by_tool": {}}
|
||||
|
||||
by_reason = {}
|
||||
by_tool = {}
|
||||
|
||||
for entry in self._rejection_log:
|
||||
reason = entry["reason"]
|
||||
tool = entry["tool_name"]
|
||||
|
||||
by_reason[reason] = by_reason.get(reason, 0) + 1
|
||||
by_tool[tool] = by_tool.get(tool, 0) + 1
|
||||
|
||||
return {
|
||||
"total": len(self._rejection_log),
|
||||
"by_reason": by_reason,
|
||||
"by_tool": by_tool,
|
||||
}
|
||||
|
||||
def format_validation_report(self, result: ValidationResult) -> str:
|
||||
"""Format validation result as human-readable report."""
|
||||
if result.valid:
|
||||
return f"✅ {result.tool_name}: valid"
|
||||
|
||||
lines = [f"❌ {result.tool_name}: BLOCKED"]
|
||||
for issue in result.blocking_issues:
|
||||
lines.append(f" [{issue.code}] {issue.message}")
|
||||
|
||||
for issue in result.warnings:
|
||||
lines.append(f" ⚠️ [{issue.code}] {issue.message}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def create_rejection_response(result: ValidationResult) -> Dict:
|
||||
"""
|
||||
Create a tool result for a rejected tool call.
|
||||
|
||||
This allows the agent to see the rejection and self-correct.
|
||||
"""
|
||||
issues_text = "\n".join(
|
||||
f"- [{i.code}] {i.message}"
|
||||
for i in result.blocking_issues
|
||||
)
|
||||
|
||||
return {
|
||||
"role": "tool",
|
||||
"content": f"""Tool call rejected: {result.tool_name}
|
||||
|
||||
Issues found:
|
||||
{issues_text}
|
||||
|
||||
Please check the tool name and parameters, then try again with valid arguments.""",
|
||||
}
|
||||
Reference in New Issue
Block a user