Compare commits

..

3 Commits

Author SHA1 Message Date
6eeee39c10 test(#922): Add tests for tool hallucination detection
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 1m15s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m8s
Tests / e2e (pull_request) Successful in 3m44s
Tests / test (pull_request) Failing after 1h9m15s
Tests for validation firewall:
- Unknown tool detection
- Missing required params
- Wrong type detection
- Hallucination patterns
- Rejection stats

Refs #922
2026-04-21 05:38:54 +00:00
b2d2d2c650 fix(#922): Poka-yoke — detect and block tool hallucination
Validation firewall between LLM tool-call output and execution:
1. Unknown tool names rejected
2. Malformed parameters caught
3. Missing required arguments detected
4. Hallucination patterns detected

All rejections logged with model provenance.
Agent receives rejection as tool result for self-correction.

Resolves #922
2026-04-21 05:38:22 +00:00
c6f2855745 fix: restore _format_error helper for test compatibility (#916)
Some checks failed
Docker Build and Publish / build-and-push (push) Has been skipped
Nix / nix (ubuntu-latest) (push) Failing after 2s
Tests / e2e (push) Successful in 2m47s
Tests / test (push) Failing after 27m41s
Build Skills Index / build-index (push) Has been skipped
Build Skills Index / deploy-with-index (push) Has been skipped
Nix / nix (macos-latest) (push) Has been cancelled
fix: restore _format_error helper for test compatibility (#916)
2026-04-20 23:56:27 +00:00
5 changed files with 407 additions and 186 deletions

View File

@@ -8,7 +8,6 @@ Handles loading and validating configuration for:
- Delivery preferences
"""
import ipaddress
import logging
import os
import json
@@ -680,26 +679,6 @@ def load_gateway_config() -> GatewayConfig:
return config
def _is_network_accessible(host: str) -> bool:
"""Return True if *host* would expose a server beyond the loopback interface.
Duplicates the logic in ``gateway.platforms.base.is_network_accessible``
without creating a circular import (base.py imports from this module).
"""
try:
addr = ipaddress.ip_address(host)
if addr.is_loopback:
return False
# ::ffff:127.x.x.x — Python's is_loopback returns False for
# IPv4-mapped loopback; unwrap and check the underlying IPv4.
if getattr(addr, "ipv4_mapped", None) and addr.ipv4_mapped.is_loopback:
return False
return True
except ValueError:
# Hostname: assume it could be network-accessible.
return True
def _validate_gateway_config(config: "GatewayConfig") -> None:
"""Validate and sanitize a loaded GatewayConfig in place.
@@ -768,22 +747,6 @@ def _validate_gateway_config(config: "GatewayConfig") -> None:
)
pconfig.enabled = False
# Warn when the API server is enabled on a network-accessible address
# without an auth key. The adapter will refuse to start anyway, but
# surfacing this at config-load time lets operators see the problem in
# the startup log before any platform adapter initialisation runs.
api_cfg = config.platforms.get(Platform.API_SERVER)
if api_cfg and api_cfg.enabled:
key = api_cfg.extra.get("key", "")
host = api_cfg.extra.get("host", "127.0.0.1")
if not key and _is_network_accessible(host):
logger.warning(
"API Server is enabled on %s but API_SERVER_KEY is not set. "
"The adapter will refuse to start on a network-accessible address. "
"Set API_SERVER_KEY or bind to 127.0.0.1 for local-only access.",
host,
)
def _apply_env_overrides(config: GatewayConfig) -> None:
"""Apply environment variable overrides to config."""

View File

@@ -10,7 +10,6 @@ from gateway.config import (
PlatformConfig,
SessionResetPolicy,
_apply_env_overrides,
_validate_gateway_config,
load_gateway_config,
)
@@ -295,151 +294,3 @@ class TestHomeChannelEnvOverrides:
home = config.platforms[platform].home_channel
assert home is not None, f"{platform.value}: home_channel should not be None"
assert (home.chat_id, home.name) == expected, platform.value
class TestValidateGatewayConfig:
"""Tests for _validate_gateway_config — in-place sanitisation of loaded config."""
# -- idle_minutes validation --
def test_idle_minutes_zero_is_corrected_to_default(self):
config = GatewayConfig()
config.default_reset_policy.idle_minutes = 0
_validate_gateway_config(config)
assert config.default_reset_policy.idle_minutes == 1440
def test_idle_minutes_negative_is_corrected_to_default(self):
config = GatewayConfig()
config.default_reset_policy.idle_minutes = -60
_validate_gateway_config(config)
assert config.default_reset_policy.idle_minutes == 1440
def test_idle_minutes_none_is_corrected_to_default(self):
config = GatewayConfig()
config.default_reset_policy.idle_minutes = None # type: ignore[assignment]
_validate_gateway_config(config)
assert config.default_reset_policy.idle_minutes == 1440
def test_valid_idle_minutes_is_unchanged(self):
config = GatewayConfig()
config.default_reset_policy.idle_minutes = 90
_validate_gateway_config(config)
assert config.default_reset_policy.idle_minutes == 90
# -- at_hour validation --
def test_at_hour_too_high_is_corrected_to_default(self):
config = GatewayConfig()
config.default_reset_policy.at_hour = 24
_validate_gateway_config(config)
assert config.default_reset_policy.at_hour == 4
def test_at_hour_negative_is_corrected_to_default(self):
config = GatewayConfig()
config.default_reset_policy.at_hour = -1
_validate_gateway_config(config)
assert config.default_reset_policy.at_hour == 4
def test_valid_at_hour_is_unchanged(self):
config = GatewayConfig()
config.default_reset_policy.at_hour = 3
_validate_gateway_config(config)
assert config.default_reset_policy.at_hour == 3
def test_at_hour_boundary_values_are_valid(self):
for valid_hour in (0, 23):
config = GatewayConfig()
config.default_reset_policy.at_hour = valid_hour
_validate_gateway_config(config)
assert config.default_reset_policy.at_hour == valid_hour
# -- empty-token warning (enabled platforms) --
def test_empty_string_token_logs_warning(self, caplog):
import logging
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=True, token=""),
}
)
with caplog.at_level(logging.WARNING, logger="gateway.config"):
_validate_gateway_config(config)
assert any(
"TELEGRAM_BOT_TOKEN" in r.message and "empty" in r.message
for r in caplog.records
)
def test_disabled_platform_with_empty_token_no_warning(self, caplog):
import logging
config = GatewayConfig(
platforms={
Platform.TELEGRAM: PlatformConfig(enabled=False, token=""),
}
)
with caplog.at_level(logging.WARNING, logger="gateway.config"):
_validate_gateway_config(config)
assert not any("TELEGRAM_BOT_TOKEN" in r.message for r in caplog.records)
# -- API Server key / binding warnings --
def test_api_server_network_binding_without_key_logs_warning(self, caplog):
import logging
config = GatewayConfig(
platforms={
Platform.API_SERVER: PlatformConfig(
enabled=True,
extra={"host": "0.0.0.0"},
),
}
)
with caplog.at_level(logging.WARNING, logger="gateway.config"):
_validate_gateway_config(config)
assert any(
"API_SERVER_KEY" in r.message for r in caplog.records
)
def test_api_server_loopback_without_key_no_warning(self, caplog):
import logging
config = GatewayConfig(
platforms={
Platform.API_SERVER: PlatformConfig(
enabled=True,
extra={"host": "127.0.0.1"},
),
}
)
with caplog.at_level(logging.WARNING, logger="gateway.config"):
_validate_gateway_config(config)
assert not any(
"API_SERVER_KEY" in r.message for r in caplog.records
)
def test_api_server_network_binding_with_key_no_warning(self, caplog):
import logging
config = GatewayConfig(
platforms={
Platform.API_SERVER: PlatformConfig(
enabled=True,
extra={"host": "0.0.0.0", "key": "sk-real-key-here"},
),
}
)
with caplog.at_level(logging.WARNING, logger="gateway.config"):
_validate_gateway_config(config)
assert not any(
"API_SERVER_KEY" in r.message for r in caplog.records
)
def test_api_server_default_loopback_without_key_no_warning(self, caplog):
"""API server with no explicit host defaults to 127.0.0.1 — no warning."""
import logging
config = GatewayConfig(
platforms={
Platform.API_SERVER: PlatformConfig(enabled=True),
}
)
with caplog.at_level(logging.WARNING, logger="gateway.config"):
_validate_gateway_config(config)
assert not any(
"API_SERVER_KEY" in r.message for r in caplog.records
)

View File

@@ -0,0 +1,67 @@
"""
Tests for tool hallucination detection (#922).
"""
import pytest
from tools.tool_validator import ToolHallucinationDetector, ValidationSeverity
class TestToolHallucinationDetector:
def setup_method(self):
self.detector = ToolHallucinationDetector()
self.detector.register_tool("read_file", {
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"encoding": {"type": "string"},
},
"required": ["path"]
}
})
def test_valid_tool_call(self):
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt"})
assert result.valid is True
assert len(result.blocking_issues) == 0
def test_unknown_tool(self):
result = self.detector.validate_tool_call("hallucinated_tool", {})
assert result.valid is False
assert any(i.code == "UNKNOWN_TOOL" for i in result.issues)
def test_missing_required_param(self):
result = self.detector.validate_tool_call("read_file", {})
assert result.valid is False
assert any(i.code == "MISSING_REQUIRED" for i in result.issues)
def test_wrong_type(self):
result = self.detector.validate_tool_call("read_file", {"path": 123})
assert result.valid is False
assert any(i.code == "WRONG_TYPE" for i in result.issues)
def test_unknown_param_warning(self):
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt", "unknown": "value"})
assert result.valid is True # Warning, not blocking
assert any(i.code == "UNKNOWN_PARAM" for i in result.issues)
def test_placeholder_detection(self):
result = self.detector.validate_tool_call("read_file", {"path": "<placeholder>"})
assert any(i.code == "PLACEHOLDER_VALUE" for i in result.issues)
def test_rejection_stats(self):
self.detector.validate_tool_call("unknown_tool", {})
self.detector.validate_tool_call("read_file", {})
stats = self.detector.get_rejection_stats()
assert stats["total"] >= 2
def test_rejection_response(self):
from tools.tool_validator import create_rejection_response
result = self.detector.validate_tool_call("unknown_tool", {})
response = create_rejection_response(result)
assert response["role"] == "tool"
assert "rejected" in response["content"].lower()
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -44,6 +44,34 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try:

312
tools/tool_validator.py Normal file
View File

@@ -0,0 +1,312 @@
"""
Poka-Yoke: Tool Hallucination Detection — #922.
Validation firewall between LLM tool-call output and actual execution.
Detects and blocks:
1. Unknown tool names (hallucinated tools)
2. Malformed parameters (wrong types)
3. Missing required arguments
4. Extra unknown parameters
Poka-Yoke Type: Detection (catches errors at the boundary before harm)
"""
import json
import logging
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
class ValidationSeverity(Enum):
"""Severity of validation failure."""
BLOCK = "block" # Must block execution
WARN = "warn" # Warning, may proceed
INFO = "info" # Informational
@dataclass
class ValidationIssue:
"""A validation issue found."""
severity: ValidationSeverity
code: str
message: str
tool_name: str
parameter: Optional[str] = None
expected: Optional[str] = None
actual: Optional[Any] = None
@dataclass
class ValidationResult:
"""Result of tool call validation."""
valid: bool
tool_name: str
issues: List[ValidationIssue] = field(default_factory=list)
corrected_args: Optional[Dict[str, Any]] = None
@property
def blocking_issues(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.BLOCK]
@property
def warnings(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.WARN]
class ToolHallucinationDetector:
"""
Poka-yoke detector for tool hallucinations.
Validates tool calls against registered schemas before execution.
"""
def __init__(self, tool_registry: Optional[Dict] = None):
"""
Initialize detector.
Args:
tool_registry: Dict of tool_name -> tool_schema
"""
self.registry = tool_registry or {}
self._rejection_log: List[Dict] = []
def register_tool(self, name: str, schema: Dict):
"""Register a tool with its JSON Schema."""
self.registry[name] = schema
def register_tools(self, tools: Dict[str, Dict]):
"""Register multiple tools."""
self.registry.update(tools)
def validate_tool_call(
self,
tool_name: str,
arguments: Dict[str, Any],
model: str = "unknown",
) -> ValidationResult:
"""
Validate a tool call against the registry.
Args:
tool_name: Name of the tool being called
arguments: Arguments passed to the tool
model: Model that generated the call (for logging)
Returns:
ValidationResult with validation status
"""
issues = []
# 1. Check if tool exists
if tool_name not in self.registry:
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="UNKNOWN_TOOL",
message=f"Tool '{tool_name}' does not exist. Available: {', '.join(sorted(self.registry.keys())[:10])}...",
tool_name=tool_name,
)
issues.append(issue)
self._log_rejection(tool_name, arguments, model, "UNKNOWN_TOOL")
return ValidationResult(valid=False, tool_name=tool_name, issues=issues)
schema = self.registry[tool_name]
params_schema = schema.get("parameters", {}).get("properties", {})
required = set(schema.get("parameters", {}).get("required", []))
# 2. Check for missing required parameters
for param in required:
if param not in arguments:
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="MISSING_REQUIRED",
message=f"Missing required parameter: {param}",
tool_name=tool_name,
parameter=param,
)
issues.append(issue)
# 3. Check parameter types
for param_name, param_value in arguments.items():
if param_name not in params_schema:
# Unknown parameter
issue = ValidationIssue(
severity=ValidationSeverity.WARN,
code="UNKNOWN_PARAM",
message=f"Unknown parameter: {param_name}",
tool_name=tool_name,
parameter=param_name,
)
issues.append(issue)
continue
param_schema = params_schema[param_name]
expected_type = param_schema.get("type")
if expected_type and not self._check_type(param_value, expected_type):
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="WRONG_TYPE",
message=f"Parameter '{param_name}' expects {expected_type}, got {type(param_value).__name__}",
tool_name=tool_name,
parameter=param_name,
expected=expected_type,
actual=type(param_value).__name__,
)
issues.append(issue)
# 4. Check for common hallucination patterns
hallucination_issues = self._detect_hallucination_patterns(tool_name, arguments)
issues.extend(hallucination_issues)
# Determine validity
has_blocking = any(i.severity == ValidationSeverity.BLOCK for i in issues)
if has_blocking:
self._log_rejection(tool_name, arguments, model,
"; ".join(i.code for i in issues if i.severity == ValidationSeverity.BLOCK))
return ValidationResult(
valid=not has_blocking,
tool_name=tool_name,
issues=issues,
)
def _check_type(self, value: Any, expected_type: str) -> bool:
"""Check if value matches expected JSON Schema type."""
type_map = {
"string": str,
"number": (int, float),
"integer": int,
"boolean": bool,
"array": list,
"object": dict,
}
expected = type_map.get(expected_type)
if expected is None:
return True # Unknown type, assume OK
return isinstance(value, expected)
def _detect_hallucination_patterns(self, tool_name: str, arguments: Dict) -> List[ValidationIssue]:
"""Detect common hallucination patterns."""
issues = []
# Pattern 1: Placeholder values
placeholder_patterns = [
r"^<.*>$", # <placeholder>
r"^\[.*\]$", # [placeholder]
r"^TODO$|^FIXME$", # TODO/FIXME
r"^example\.com$", # example.com
r"^127\.0\.0\.1$", # localhost
]
for param_name, param_value in arguments.items():
if isinstance(param_value, str):
for pattern in placeholder_patterns:
if re.match(pattern, param_value, re.IGNORECASE):
issues.append(ValidationIssue(
severity=ValidationSeverity.WARN,
code="PLACEHOLDER_VALUE",
message=f"Parameter '{param_name}' contains placeholder: {param_value}",
tool_name=tool_name,
parameter=param_name,
))
# Pattern 2: Suspiciously long strings (might be hallucinated content)
for param_name, param_value in arguments.items():
if isinstance(param_value, str) and len(param_value) > 10000:
issues.append(ValidationIssue(
severity=ValidationSeverity.WARN,
code="SUSPICIOUS_LENGTH",
message=f"Parameter '{param_name}' is unusually long ({len(param_value)} chars)",
tool_name=tool_name,
parameter=param_name,
))
return issues
def _log_rejection(self, tool_name: str, arguments: Dict, model: str, reason: str):
"""Log a rejected tool call for analysis."""
import time
entry = {
"timestamp": time.time(),
"tool_name": tool_name,
"arguments": {k: str(v)[:100] for k, v in arguments.items()},
"model": model,
"reason": reason,
}
self._rejection_log.append(entry)
# Keep log bounded
if len(self._rejection_log) > 1000:
self._rejection_log = self._rejection_log[-500:]
logger.warning(
"Tool hallucination blocked: tool=%s, model=%s, reason=%s",
tool_name, model, reason
)
def get_rejection_stats(self) -> Dict:
"""Get statistics on rejected tool calls."""
if not self._rejection_log:
return {"total": 0, "by_reason": {}, "by_tool": {}}
by_reason = {}
by_tool = {}
for entry in self._rejection_log:
reason = entry["reason"]
tool = entry["tool_name"]
by_reason[reason] = by_reason.get(reason, 0) + 1
by_tool[tool] = by_tool.get(tool, 0) + 1
return {
"total": len(self._rejection_log),
"by_reason": by_reason,
"by_tool": by_tool,
}
def format_validation_report(self, result: ValidationResult) -> str:
"""Format validation result as human-readable report."""
if result.valid:
return f"{result.tool_name}: valid"
lines = [f"{result.tool_name}: BLOCKED"]
for issue in result.blocking_issues:
lines.append(f" [{issue.code}] {issue.message}")
for issue in result.warnings:
lines.append(f" ⚠️ [{issue.code}] {issue.message}")
return "\n".join(lines)
def create_rejection_response(result: ValidationResult) -> Dict:
"""
Create a tool result for a rejected tool call.
This allows the agent to see the rejection and self-correct.
"""
issues_text = "\n".join(
f"- [{i.code}] {i.message}"
for i in result.blocking_issues
)
return {
"role": "tool",
"content": f"""Tool call rejected: {result.tool_name}
Issues found:
{issues_text}
Please check the tool name and parameters, then try again with valid arguments.""",
}