Compare commits

..

3 Commits

Author SHA1 Message Date
6eeee39c10 test(#922): Add tests for tool hallucination detection
Some checks failed
Contributor Attribution Check / check-attribution (pull_request) Failing after 1m15s
Docker Build and Publish / build-and-push (pull_request) Has been skipped
Supply Chain Audit / Scan PR for supply chain risks (pull_request) Successful in 1m8s
Tests / e2e (pull_request) Successful in 3m44s
Tests / test (pull_request) Failing after 1h9m15s
Tests for validation firewall:
- Unknown tool detection
- Missing required params
- Wrong type detection
- Hallucination patterns
- Rejection stats

Refs #922
2026-04-21 05:38:54 +00:00
b2d2d2c650 fix(#922): Poka-yoke — detect and block tool hallucination
Validation firewall between LLM tool-call output and execution:
1. Unknown tool names rejected
2. Malformed parameters caught
3. Missing required arguments detected
4. Hallucination patterns detected

All rejections logged with model provenance.
Agent receives rejection as tool result for self-correction.

Resolves #922
2026-04-21 05:38:22 +00:00
c6f2855745 fix: restore _format_error helper for test compatibility (#916)
Some checks failed
Docker Build and Publish / build-and-push (push) Has been skipped
Nix / nix (ubuntu-latest) (push) Failing after 2s
Tests / e2e (push) Successful in 2m47s
Tests / test (push) Failing after 27m41s
Build Skills Index / build-index (push) Has been skipped
Build Skills Index / deploy-with-index (push) Has been skipped
Nix / nix (macos-latest) (push) Has been cancelled
fix: restore _format_error helper for test compatibility (#916)
2026-04-20 23:56:27 +00:00
5 changed files with 407 additions and 204 deletions

View File

@@ -1,146 +0,0 @@
"""Time-aware model routing for cron jobs.
Routes cron tasks to more capable models during off-hours when the user
is not present to correct errors. Reduces error rates during high-error
time windows (e.g., 18:00 evening batches).
Usage:
from agent.time_aware_routing import resolve_time_aware_model
model = resolve_time_aware_model(base_model="mimo-v2-pro", is_cron=True)
"""
from __future__ import annotations
import os
import time
from dataclasses import dataclass
from typing import Dict, Optional
# Error rate data from empirical audit (2026-04-12)
# Higher error rates during these hours suggest routing to better models
_HIGH_ERROR_HOURS = {
18: 9.4, # 18:00 — 9.4% error rate (evening cron batches)
19: 8.1,
20: 7.5,
21: 6.8,
22: 6.2,
23: 5.9,
0: 5.5,
1: 5.2,
}
# Low error hours — default model is fine
_LOW_ERROR_HOURS = set(range(6, 18)) # 06:00-17:59
# Default fallback models by time zone
_DEFAULT_STRONG_MODEL = os.getenv("CRON_STRONG_MODEL", "xiaomi/mimo-v2-pro")
_DEFAULT_CHEAP_MODEL = os.getenv("CRON_CHEAP_MODEL", "qwen2.5:7b")
_ERROR_THRESHOLD = float(os.getenv("CRON_ERROR_THRESHOLD", "6.0")) # % error rate
@dataclass
class RoutingDecision:
"""Result of time-aware routing."""
model: str
provider: str
reason: str
hour: int
error_rate: float
is_off_hours: bool
def get_hour_error_rate(hour: int) -> float:
"""Get expected error rate for a given hour (0-23)."""
return _HIGH_ERROR_HOURS.get(hour, 4.0) # Default 4% for unlisted hours
def is_off_hours(hour: int) -> bool:
"""Check if hour is considered off-hours (higher error rates)."""
return hour not in _LOW_ERROR_HOURS
def resolve_time_aware_model(
base_model: str = "",
base_provider: str = "",
is_cron: bool = False,
hour: Optional[int] = None,
) -> RoutingDecision:
"""Resolve model based on time of day and task type.
During off-hours (evening/night), routes to stronger models for cron
jobs to compensate for lack of human oversight.
Args:
base_model: The model that would normally be used.
base_provider: The provider for the base model.
is_cron: Whether this is a cron job (vs interactive session).
hour: Override hour (for testing). Defaults to current hour.
Returns:
RoutingDecision with model, provider, and reasoning.
"""
if hour is None:
hour = time.localtime().tm_hour
error_rate = get_hour_error_rate(hour)
off_hours = is_off_hours(hour)
# Interactive sessions always use the base model (user can correct errors)
if not is_cron:
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason="Interactive session — user can correct errors",
hour=hour,
error_rate=error_rate,
is_off_hours=off_hours,
)
# Cron jobs during low-error hours: use base model
if not off_hours and error_rate < _ERROR_THRESHOLD:
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason=f"Low-error hours ({hour}:00, {error_rate}% expected)",
hour=hour,
error_rate=error_rate,
is_off_hours=False,
)
# Cron jobs during high-error hours: upgrade to stronger model
if error_rate >= _ERROR_THRESHOLD:
return RoutingDecision(
model=_DEFAULT_STRONG_MODEL,
provider="nous",
reason=f"High-error hours ({hour}:00, {error_rate}% expected) — using stronger model",
hour=hour,
error_rate=error_rate,
is_off_hours=True,
)
# Off-hours but low error: use base model
return RoutingDecision(
model=base_model or _DEFAULT_CHEAP_MODEL,
provider=base_provider,
reason=f"Off-hours but low error ({hour}:00, {error_rate}%)",
hour=hour,
error_rate=error_rate,
is_off_hours=off_hours,
)
def get_routing_report() -> str:
"""Get a report of time-based routing decisions for the next 24 hours."""
lines = ["Time-Aware Model Routing (24h forecast)", "=" * 40, ""]
lines.append(f"Error threshold: {_ERROR_THRESHOLD}%")
lines.append(f"Strong model: {_DEFAULT_STRONG_MODEL}")
lines.append(f"Cheap model: {_DEFAULT_CHEAP_MODEL}")
lines.append("")
for h in range(24):
decision = resolve_time_aware_model(is_cron=True, hour=h)
icon = "\U0001f7e2" if decision.model == _DEFAULT_CHEAP_MODEL else "\U0001f534"
lines.append(f" {h:02d}:00 {icon} {decision.model:25s} ({decision.error_rate}% error)")
return "\n".join(lines)

View File

@@ -1,58 +0,0 @@
"""Tests for time-aware model routing."""
import pytest
import sys
from pathlib import Path
sys.path.insert(0, str(Path(__file__).resolve().parent.parent))
from agent.time_aware_routing import (
resolve_time_aware_model,
get_hour_error_rate,
is_off_hours,
get_routing_report,
)
class TestErrorRates:
def test_evening_high_error(self):
assert get_hour_error_rate(18) == 9.4
assert get_hour_error_rate(19) == 8.1
def test_morning_low_error(self):
assert get_hour_error_rate(9) == 4.0
assert get_hour_error_rate(12) == 4.0
def test_default_for_unknown(self):
assert get_hour_error_rate(15) == 4.0
class TestOffHours:
def test_evening_is_off_hours(self):
assert is_off_hours(20) is True
assert is_off_hours(2) is True
def test_business_hours_not_off(self):
assert is_off_hours(9) is False
assert is_off_hours(14) is False
class TestRouting:
def test_interactive_uses_base_model(self):
d = resolve_time_aware_model("my-model", "my-provider", is_cron=False, hour=18)
assert d.model == "my-model"
assert "Interactive" in d.reason
def test_cron_low_error_uses_base(self):
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=10)
assert d.model == "cheap-model"
def test_cron_high_error_upgrades(self):
d = resolve_time_aware_model("cheap-model", is_cron=True, hour=18)
assert d.model != "cheap-model"
assert d.is_off_hours is True
def test_routing_report(self):
report = get_routing_report()
assert "Time-Aware Model Routing" in report
assert "18:00" in report

View File

@@ -0,0 +1,67 @@
"""
Tests for tool hallucination detection (#922).
"""
import pytest
from tools.tool_validator import ToolHallucinationDetector, ValidationSeverity
class TestToolHallucinationDetector:
def setup_method(self):
self.detector = ToolHallucinationDetector()
self.detector.register_tool("read_file", {
"parameters": {
"type": "object",
"properties": {
"path": {"type": "string"},
"encoding": {"type": "string"},
},
"required": ["path"]
}
})
def test_valid_tool_call(self):
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt"})
assert result.valid is True
assert len(result.blocking_issues) == 0
def test_unknown_tool(self):
result = self.detector.validate_tool_call("hallucinated_tool", {})
assert result.valid is False
assert any(i.code == "UNKNOWN_TOOL" for i in result.issues)
def test_missing_required_param(self):
result = self.detector.validate_tool_call("read_file", {})
assert result.valid is False
assert any(i.code == "MISSING_REQUIRED" for i in result.issues)
def test_wrong_type(self):
result = self.detector.validate_tool_call("read_file", {"path": 123})
assert result.valid is False
assert any(i.code == "WRONG_TYPE" for i in result.issues)
def test_unknown_param_warning(self):
result = self.detector.validate_tool_call("read_file", {"path": "/tmp/file.txt", "unknown": "value"})
assert result.valid is True # Warning, not blocking
assert any(i.code == "UNKNOWN_PARAM" for i in result.issues)
def test_placeholder_detection(self):
result = self.detector.validate_tool_call("read_file", {"path": "<placeholder>"})
assert any(i.code == "PLACEHOLDER_VALUE" for i in result.issues)
def test_rejection_stats(self):
self.detector.validate_tool_call("unknown_tool", {})
self.detector.validate_tool_call("read_file", {})
stats = self.detector.get_rejection_stats()
assert stats["total"] >= 2
def test_rejection_response(self):
from tools.tool_validator import create_rejection_response
result = self.detector.validate_tool_call("unknown_tool", {})
response = create_rejection_response(result)
assert response["role"] == "tool"
assert "rejected" in response["content"].lower()
if __name__ == "__main__":
pytest.main([__file__])

View File

@@ -44,6 +44,34 @@ from typing import Dict, Any, Optional, Tuple
logger = logging.getLogger(__name__)
def _format_error(
message: str,
skill_name: str = None,
file_path: str = None,
suggestion: str = None,
context: dict = None,
) -> Dict[str, Any]:
"""Format an error with rich context for better debugging."""
parts = [message]
if skill_name:
parts.append(f"Skill: {skill_name}")
if file_path:
parts.append(f"File: {file_path}")
if suggestion:
parts.append(f"Suggestion: {suggestion}")
if context:
for key, value in context.items():
parts.append(f"{key}: {value}")
return {
"success": False,
"error": " | ".join(parts),
"skill_name": skill_name,
"file_path": file_path,
"suggestion": suggestion,
}
# Import security scanner — agent-created skills get the same scrutiny as
# community hub installs.
try:

312
tools/tool_validator.py Normal file
View File

@@ -0,0 +1,312 @@
"""
Poka-Yoke: Tool Hallucination Detection — #922.
Validation firewall between LLM tool-call output and actual execution.
Detects and blocks:
1. Unknown tool names (hallucinated tools)
2. Malformed parameters (wrong types)
3. Missing required arguments
4. Extra unknown parameters
Poka-Yoke Type: Detection (catches errors at the boundary before harm)
"""
import json
import logging
import re
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional, Set
logger = logging.getLogger(__name__)
class ValidationSeverity(Enum):
"""Severity of validation failure."""
BLOCK = "block" # Must block execution
WARN = "warn" # Warning, may proceed
INFO = "info" # Informational
@dataclass
class ValidationIssue:
"""A validation issue found."""
severity: ValidationSeverity
code: str
message: str
tool_name: str
parameter: Optional[str] = None
expected: Optional[str] = None
actual: Optional[Any] = None
@dataclass
class ValidationResult:
"""Result of tool call validation."""
valid: bool
tool_name: str
issues: List[ValidationIssue] = field(default_factory=list)
corrected_args: Optional[Dict[str, Any]] = None
@property
def blocking_issues(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.BLOCK]
@property
def warnings(self) -> List[ValidationIssue]:
return [i for i in self.issues if i.severity == ValidationSeverity.WARN]
class ToolHallucinationDetector:
"""
Poka-yoke detector for tool hallucinations.
Validates tool calls against registered schemas before execution.
"""
def __init__(self, tool_registry: Optional[Dict] = None):
"""
Initialize detector.
Args:
tool_registry: Dict of tool_name -> tool_schema
"""
self.registry = tool_registry or {}
self._rejection_log: List[Dict] = []
def register_tool(self, name: str, schema: Dict):
"""Register a tool with its JSON Schema."""
self.registry[name] = schema
def register_tools(self, tools: Dict[str, Dict]):
"""Register multiple tools."""
self.registry.update(tools)
def validate_tool_call(
self,
tool_name: str,
arguments: Dict[str, Any],
model: str = "unknown",
) -> ValidationResult:
"""
Validate a tool call against the registry.
Args:
tool_name: Name of the tool being called
arguments: Arguments passed to the tool
model: Model that generated the call (for logging)
Returns:
ValidationResult with validation status
"""
issues = []
# 1. Check if tool exists
if tool_name not in self.registry:
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="UNKNOWN_TOOL",
message=f"Tool '{tool_name}' does not exist. Available: {', '.join(sorted(self.registry.keys())[:10])}...",
tool_name=tool_name,
)
issues.append(issue)
self._log_rejection(tool_name, arguments, model, "UNKNOWN_TOOL")
return ValidationResult(valid=False, tool_name=tool_name, issues=issues)
schema = self.registry[tool_name]
params_schema = schema.get("parameters", {}).get("properties", {})
required = set(schema.get("parameters", {}).get("required", []))
# 2. Check for missing required parameters
for param in required:
if param not in arguments:
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="MISSING_REQUIRED",
message=f"Missing required parameter: {param}",
tool_name=tool_name,
parameter=param,
)
issues.append(issue)
# 3. Check parameter types
for param_name, param_value in arguments.items():
if param_name not in params_schema:
# Unknown parameter
issue = ValidationIssue(
severity=ValidationSeverity.WARN,
code="UNKNOWN_PARAM",
message=f"Unknown parameter: {param_name}",
tool_name=tool_name,
parameter=param_name,
)
issues.append(issue)
continue
param_schema = params_schema[param_name]
expected_type = param_schema.get("type")
if expected_type and not self._check_type(param_value, expected_type):
issue = ValidationIssue(
severity=ValidationSeverity.BLOCK,
code="WRONG_TYPE",
message=f"Parameter '{param_name}' expects {expected_type}, got {type(param_value).__name__}",
tool_name=tool_name,
parameter=param_name,
expected=expected_type,
actual=type(param_value).__name__,
)
issues.append(issue)
# 4. Check for common hallucination patterns
hallucination_issues = self._detect_hallucination_patterns(tool_name, arguments)
issues.extend(hallucination_issues)
# Determine validity
has_blocking = any(i.severity == ValidationSeverity.BLOCK for i in issues)
if has_blocking:
self._log_rejection(tool_name, arguments, model,
"; ".join(i.code for i in issues if i.severity == ValidationSeverity.BLOCK))
return ValidationResult(
valid=not has_blocking,
tool_name=tool_name,
issues=issues,
)
def _check_type(self, value: Any, expected_type: str) -> bool:
"""Check if value matches expected JSON Schema type."""
type_map = {
"string": str,
"number": (int, float),
"integer": int,
"boolean": bool,
"array": list,
"object": dict,
}
expected = type_map.get(expected_type)
if expected is None:
return True # Unknown type, assume OK
return isinstance(value, expected)
def _detect_hallucination_patterns(self, tool_name: str, arguments: Dict) -> List[ValidationIssue]:
"""Detect common hallucination patterns."""
issues = []
# Pattern 1: Placeholder values
placeholder_patterns = [
r"^<.*>$", # <placeholder>
r"^\[.*\]$", # [placeholder]
r"^TODO$|^FIXME$", # TODO/FIXME
r"^example\.com$", # example.com
r"^127\.0\.0\.1$", # localhost
]
for param_name, param_value in arguments.items():
if isinstance(param_value, str):
for pattern in placeholder_patterns:
if re.match(pattern, param_value, re.IGNORECASE):
issues.append(ValidationIssue(
severity=ValidationSeverity.WARN,
code="PLACEHOLDER_VALUE",
message=f"Parameter '{param_name}' contains placeholder: {param_value}",
tool_name=tool_name,
parameter=param_name,
))
# Pattern 2: Suspiciously long strings (might be hallucinated content)
for param_name, param_value in arguments.items():
if isinstance(param_value, str) and len(param_value) > 10000:
issues.append(ValidationIssue(
severity=ValidationSeverity.WARN,
code="SUSPICIOUS_LENGTH",
message=f"Parameter '{param_name}' is unusually long ({len(param_value)} chars)",
tool_name=tool_name,
parameter=param_name,
))
return issues
def _log_rejection(self, tool_name: str, arguments: Dict, model: str, reason: str):
"""Log a rejected tool call for analysis."""
import time
entry = {
"timestamp": time.time(),
"tool_name": tool_name,
"arguments": {k: str(v)[:100] for k, v in arguments.items()},
"model": model,
"reason": reason,
}
self._rejection_log.append(entry)
# Keep log bounded
if len(self._rejection_log) > 1000:
self._rejection_log = self._rejection_log[-500:]
logger.warning(
"Tool hallucination blocked: tool=%s, model=%s, reason=%s",
tool_name, model, reason
)
def get_rejection_stats(self) -> Dict:
"""Get statistics on rejected tool calls."""
if not self._rejection_log:
return {"total": 0, "by_reason": {}, "by_tool": {}}
by_reason = {}
by_tool = {}
for entry in self._rejection_log:
reason = entry["reason"]
tool = entry["tool_name"]
by_reason[reason] = by_reason.get(reason, 0) + 1
by_tool[tool] = by_tool.get(tool, 0) + 1
return {
"total": len(self._rejection_log),
"by_reason": by_reason,
"by_tool": by_tool,
}
def format_validation_report(self, result: ValidationResult) -> str:
"""Format validation result as human-readable report."""
if result.valid:
return f"{result.tool_name}: valid"
lines = [f"{result.tool_name}: BLOCKED"]
for issue in result.blocking_issues:
lines.append(f" [{issue.code}] {issue.message}")
for issue in result.warnings:
lines.append(f" ⚠️ [{issue.code}] {issue.message}")
return "\n".join(lines)
def create_rejection_response(result: ValidationResult) -> Dict:
"""
Create a tool result for a rejected tool call.
This allows the agent to see the rejection and self-correct.
"""
issues_text = "\n".join(
f"- [{i.code}] {i.message}"
for i in result.blocking_issues
)
return {
"role": "tool",
"content": f"""Tool call rejected: {result.tool_name}
Issues found:
{issues_text}
Please check the tool name and parameters, then try again with valid arguments.""",
}