Compare commits

..

1 Commits

Author SHA1 Message Date
Alexander Whitestone
f88e57bcfe feat: route image files through vision analysis (Gemma 4 multimodal)
All checks were successful
Lint / lint (pull_request) Successful in 8s
- tools/binary_extensions.py: add IMAGE_EXTENSIONS + has_image_extension()
- tools/file_tools.py: detect image files in read_file_tool and auto-route
to vision_analyze_tool instead of returning a binary-file error. Wraps
the vision result so callers know it came from image analysis.
- tools/browser_tool.py: update browser_vision docstring to document that
natively multimodal models (e.g. Gemma 4) are used directly when available.
- tests/tools/test_binary_extensions.py: new tests for image extension helpers
- tests/tools/test_file_tools.py: add TestReadFileImageRouting for PNG/JPEG/
WebP auto-routing and TestAnalyzeImageWithVision for fallback coverage.

Closes #800
2026-04-22 02:54:18 -04:00
9 changed files with 175 additions and 510 deletions

View File

@@ -1,281 +0,0 @@
"""
Hallucination Metrics — Persistent logging and alerting for tool hallucinations.
Logs tool hallucination events to a JSONL file and provides aggregated statistics.
Integrates with the poka-yoke validation system.
Usage:
from agent.hallucination_metrics import log_hallucination_event, get_hallucination_stats
log_hallucination_event("invalid_tool", "unknown_tool", "suggested_correct_name")
stats = get_hallucination_stats()
"""
import json
import logging
import os
import time
from collections import defaultdict
from datetime import datetime, timezone
from pathlib import Path
from threading import Lock
from typing import Any, Dict, List, Optional, Tuple
from hermes_constants import get_hermes_home
logger = logging.getLogger(__name__)
# Constants
METRICS_FILE_NAME = "hallucination_metrics.jsonl"
ALERT_THRESHOLD = 10 # Alert after this many consecutive failures for a tool
SESSION_WINDOW_HOURS = 24 # Consider events within this window as "session"
# In-memory cache for fast lookups
_cache: Dict[str, Any] = {"events": [], "last_flush": 0, "session_counts": defaultdict(int)}
_cache_lock = Lock()
def _get_metrics_path() -> Path:
"""Return the path to the hallucination metrics file."""
return get_hermes_home() / "metrics" / METRICS_FILE_NAME
def _ensure_metrics_dir():
"""Ensure the metrics directory exists."""
metrics_dir = _get_metrics_path().parent
metrics_dir.mkdir(parents=True, exist_ok=True)
def log_hallucination_event(
tool_name: str,
error_type: str = "unknown_tool",
suggested_name: Optional[str] = None,
validation_messages: Optional[List[str]] = None,
session_id: Optional[str] = None,
) -> Dict[str, Any]:
"""
Log a hallucination event to the metrics file.
Args:
tool_name: The hallucinated tool name
error_type: Type of error (unknown_tool, invalid_params, etc.)
suggested_name: Suggested correction if available
validation_messages: List of validation error messages
session_id: Optional session identifier for grouping
Returns:
The logged event dict with additional metadata
"""
event = {
"timestamp": datetime.now(timezone.utc).isoformat(),
"tool_name": tool_name,
"error_type": error_type,
"suggested_name": suggested_name,
"validation_messages": validation_messages or [],
"session_id": session_id,
"unix_timestamp": time.time(),
}
# Write to file
_ensure_metrics_dir()
metrics_path = _get_metrics_path()
try:
with open(metrics_path, "a", encoding="utf-8") as f:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
except Exception as e:
logger.warning(f"Failed to write hallucination event: {e}")
# Update in-memory cache
with _cache_lock:
_cache["events"].append(event)
_cache["session_counts"][tool_name] += 1
session_count = _cache["session_counts"][tool_name]
# Check alert threshold
if session_count >= ALERT_THRESHOLD:
logger.warning(
f"HALLUCINATION ALERT: Tool '{tool_name}' has failed {session_count} times "
f"in this session (threshold: {ALERT_THRESHOLD}). "
f"This may indicate a persistent hallucination pattern."
)
return event
def _load_events_from_file() -> List[Dict[str, Any]]:
"""Load all events from the metrics file."""
metrics_path = _get_metrics_path()
if not metrics_path.exists():
return []
events = []
try:
with open(metrics_path, "r", encoding="utf-8") as f:
for line in f:
line = line.strip()
if line:
try:
events.append(json.loads(line))
except json.JSONDecodeError:
continue
except Exception as e:
logger.warning(f"Failed to load hallucination events: {e}")
return events
def get_hallucination_stats(
hours: Optional[int] = None,
tool_name: Optional[str] = None,
) -> Dict[str, Any]:
"""
Get aggregated hallucination statistics.
Args:
hours: Only consider events from the last N hours (None = all time)
tool_name: Filter to specific tool name (None = all tools)
Returns:
Dict with aggregated statistics
"""
events = _load_events_from_file()
# Filter by time window
if hours is not None:
cutoff = time.time() - (hours * 3600)
events = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
# Filter by tool name
if tool_name is not None:
events = [e for e in events if e.get("tool_name") == tool_name]
# Aggregate by tool
tool_counts: Dict[str, Dict[str, Any]] = defaultdict(
lambda: {"count": 0, "suggested_names": [], "error_types": defaultdict(int)}
)
for event in events:
name = event.get("tool_name", "unknown")
tool_counts[name]["count"] += 1
if event.get("suggested_name"):
tool_counts[name]["suggested_names"].append(event["suggested_name"])
if event.get("error_type"):
tool_counts[name]["error_types"][event["error_type"]] += 1
# Find most common suggestions per tool
for name, data in tool_counts.items():
suggestions = data["suggested_names"]
if suggestions:
from collections import Counter
most_common = Counter(suggestions).most_common(1)[0]
data["most_common_suggestion"] = most_common[0]
data["suggestion_count"] = most_common[1]
del data["suggested_names"] # Remove raw list from output
# Calculate time-based stats
if events:
first_event = min(e.get("unix_timestamp", 0) for e in events)
last_event = max(e.get("unix_timestamp", 0) for e in events)
time_span_hours = (last_event - first_event) / 3600 if first_event != last_event else 0
else:
time_span_hours = 0
# Error type breakdown
all_error_types: Dict[str, int] = defaultdict(int)
for event in events:
et = event.get("error_type", "unknown")
all_error_types[et] += 1
return {
"total_events": len(events),
"unique_tools": len(tool_counts),
"time_span_hours": round(time_span_hours, 1),
"top_hallucinated_tools": sorted(
[{"tool": k, **v} for k, v in tool_counts.items()],
key=lambda x: -x["count"]
)[:20],
"error_type_breakdown": dict(all_error_types),
"alert_threshold": ALERT_THRESHOLD,
"session_window_hours": SESSION_WINDOW_HOURS,
}
def get_most_hallucinated_tools(n: int = 10) -> List[Tuple[str, int]]:
"""Get the top N most frequently hallucinated tool names."""
stats = get_hallucination_stats()
tools = stats.get("top_hallucinated_tools", [])
return [(t["tool"], t["count"]) for t in tools[:n]]
def clear_metrics(older_than_hours: Optional[int] = None) -> int:
"""
Clear hallucination metrics.
Args:
older_than_hours: Only clear events older than this many hours (None = clear all)
Returns:
Number of events removed
"""
metrics_path = _get_metrics_path()
if not metrics_path.exists():
return 0
if older_than_hours is None:
# Clear all
count = len(_load_events_from_file())
metrics_path.unlink(missing_ok=True)
with _cache_lock:
_cache["events"].clear()
_cache["session_counts"].clear()
return count
# Clear only old events
cutoff = time.time() - (older_than_hours * 3600)
events = _load_events_from_file()
keep = [e for e in events if e.get("unix_timestamp", 0) >= cutoff]
removed = len(events) - len(keep)
# Rewrite file
_ensure_metrics_dir()
with open(metrics_path, "w", encoding="utf-8") as f:
for event in keep:
f.write(json.dumps(event, ensure_ascii=False) + "\n")
return removed
def format_stats_for_display(stats: Dict[str, Any]) -> str:
"""Format statistics as a human-readable string."""
lines = [
"=== Hallucination Metrics ===",
"",
f"Total events: {stats['total_events']}",
f"Unique tools hallucinated: {stats['unique_tools']}",
f"Time span: {stats['time_span_hours']:.1f} hours",
"",
"Top Hallucinated Tools:",
"-" * 40,
]
for tool in stats.get("top_hallucinated_tools", [])[:10]:
lines.append(f" {tool['tool']:<30} {tool['count']:>5} events")
if "most_common_suggestion" in tool:
lines.append(f" → Suggested: {tool['most_common_suggestion']} ({tool['suggestion_count']}x)")
if stats.get("error_type_breakdown"):
lines.extend([
"",
"Error Types:",
"-" * 40,
])
for et, count in sorted(stats["error_type_breakdown"].items(), key=lambda x: -x[1]):
lines.append(f" {et:<30} {count:>5}")
lines.extend([
"",
f"Alert threshold: {stats['alert_threshold']} failures per session",
f"Session window: {stats['session_window_hours']} hours",
])
return "\n".join(lines)

View File

@@ -18,7 +18,6 @@ Usage:
hermes cron list # List cron jobs
hermes cron status # Check if cron scheduler is running
hermes doctor # Check configuration and dependencies
hermes hallucination-stats # Show tool hallucination statistics
hermes honcho setup # Configure Honcho AI memory integration
hermes honcho status # Show Honcho config and connection status
hermes honcho sessions # List directory → session name mappings
@@ -2805,17 +2804,6 @@ def cmd_doctor(args):
run_doctor(args)
def cmd_hallucination_stats(args):
"""Show tool hallucination statistics."""
from agent.hallucination_metrics import get_hallucination_stats, format_stats_for_display, clear_metrics
if getattr(args, 'clear', False):
removed = clear_metrics(older_than_hours=getattr(args, 'older_than', None))
print(f"Cleared {removed} hallucination events.")
return
stats = get_hallucination_stats(hours=getattr(args, 'hours', None))
print(format_stats_for_display(stats))
def cmd_dump(args):
"""Dump setup summary for support/debugging."""
from hermes_cli.dump import run_dump
@@ -5053,33 +5041,6 @@ For more help on a command:
)
doctor_parser.set_defaults(func=cmd_doctor)
# =========================================================================
# hallucination-stats command
# =========================================================================
hallucination_parser = subparsers.add_parser(
"hallucination-stats",
help="Show tool hallucination statistics",
description="View aggregated tool hallucination metrics from poka-yoke validation"
)
hallucination_parser.add_argument(
"--hours",
type=int,
default=None,
help="Only show events from the last N hours"
)
hallucination_parser.add_argument(
"--clear",
action="store_true",
help="Clear all hallucination metrics"
)
hallucination_parser.add_argument(
"--older-than",
type=int,
default=None,
help="When clearing, only remove events older than N hours"
)
hallucination_parser.set_defaults(func=cmd_hallucination_stats)
# =========================================================================
# dump command
# =========================================================================

View File

@@ -1,171 +0,0 @@
"""Tests for agent/hallucination_metrics.py — #853."""
import json
import time
from pathlib import Path
import pytest
from agent.hallucination_metrics import (
log_hallucination_event,
get_hallucination_stats,
get_most_hallucinated_tools,
clear_metrics,
format_stats_for_display,
_get_metrics_path,
)
@pytest.fixture(autouse=True)
def isolated_metrics(monkeypatch, tmp_path):
"""Redirect metrics to a temp file for every test."""
metrics_dir = tmp_path / "test_hermes_home" / "metrics"
metrics_dir.mkdir(parents=True)
metrics_file = metrics_dir / "hallucination_metrics.jsonl"
# Patch the get_hermes_home function to return our temp path
def mock_get_hermes_home():
return tmp_path / "test_hermes_home"
monkeypatch.setattr(
"agent.hallucination_metrics.get_hermes_home",
mock_get_hermes_home,
)
# Also clear cache
from agent.hallucination_metrics import _cache, _cache_lock
with _cache_lock:
_cache["events"].clear()
_cache["session_counts"].clear()
yield
clear_metrics()
class TestLogEvent:
def test_log_event_returns_dict(self):
event = log_hallucination_event("fake_tool", "unknown_tool", "real_tool")
assert event["tool_name"] == "fake_tool"
assert event["error_type"] == "unknown_tool"
assert event["suggested_name"] == "real_tool"
assert "timestamp" in event
assert "unix_timestamp" in event
def test_log_event_persists_to_file(self):
log_hallucination_event("tool_a", "unknown_tool")
log_hallucination_event("tool_b", "invalid_params")
path = _get_metrics_path()
assert path.exists()
lines = path.read_text().strip().splitlines()
assert len(lines) == 2
data = [json.loads(line) for line in lines]
assert data[0]["tool_name"] == "tool_a"
assert data[1]["tool_name"] == "tool_b"
class TestGetStats:
def test_empty_stats(self):
stats = get_hallucination_stats()
assert stats["total_events"] == 0
assert stats["unique_tools"] == 0
def test_stats_by_tool(self):
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
log_hallucination_event("tool_x", "unknown_tool", "tool_y")
log_hallucination_event("tool_z", "invalid_params")
stats = get_hallucination_stats()
assert stats["total_events"] == 3
assert stats["unique_tools"] == 2
top = stats["top_hallucinated_tools"]
assert len(top) == 2
assert top[0]["tool"] == "tool_x"
assert top[0]["count"] == 2
assert top[1]["tool"] == "tool_z"
assert top[1]["count"] == 1
def test_stats_hours_filter(self):
# Log old event by faking timestamp
old_event = {
"timestamp": "2026-01-01T00:00:00+00:00",
"tool_name": "old_tool",
"error_type": "unknown_tool",
"unix_timestamp": time.time() - 48 * 3600,
}
path = _get_metrics_path()
path.parent.mkdir(parents=True, exist_ok=True)
with open(path, "w") as f:
f.write(json.dumps(old_event) + "\n")
log_hallucination_event("new_tool", "unknown_tool")
stats = get_hallucination_stats(hours=24)
assert stats["total_events"] == 1
assert stats["top_hallucinated_tools"][0]["tool"] == "new_tool"
def test_error_type_breakdown(self):
log_hallucination_event("t1", "unknown_tool")
log_hallucination_event("t2", "invalid_params")
log_hallucination_event("t3", "unknown_tool")
stats = get_hallucination_stats()
breakdown = stats["error_type_breakdown"]
assert breakdown["unknown_tool"] == 2
assert breakdown["invalid_params"] == 1
class TestGetMostHallucinated:
def test_top_tools(self):
for _ in range(5):
log_hallucination_event("common_tool", "unknown_tool")
for _ in range(2):
log_hallucination_event("rare_tool", "unknown_tool")
tools = get_most_hallucinated_tools(n=2)
assert tools[0] == ("common_tool", 5)
assert tools[1] == ("rare_tool", 2)
class TestClearMetrics:
def test_clear_all(self):
log_hallucination_event("t1", "unknown_tool")
removed = clear_metrics()
assert removed == 1
assert _get_metrics_path().exists() is False
def test_clear_older_than(self):
path = _get_metrics_path()
path.parent.mkdir(parents=True, exist_ok=True)
old = {"tool_name": "old", "unix_timestamp": time.time() - 48 * 3600}
new = {"tool_name": "new", "unix_timestamp": time.time()}
with open(path, "w") as f:
f.write(json.dumps(old) + "\n")
f.write(json.dumps(new) + "\n")
removed = clear_metrics(older_than_hours=24)
assert removed == 1
remaining = get_hallucination_stats()
assert remaining["total_events"] == 1
class TestFormatDisplay:
def test_format_includes_headers(self):
log_hallucination_event("bad_tool", "unknown_tool", "good_tool")
stats = get_hallucination_stats()
text = format_stats_for_display(stats)
assert "Hallucination Metrics" in text
assert "bad_tool" in text
assert "Total events: 1" in text
class TestAlertThreshold:
def test_alert_after_threshold(self, monkeypatch, caplog):
monkeypatch.setattr("agent.hallucination_metrics.ALERT_THRESHOLD", 3)
for i in range(4):
log_hallucination_event("persistent_tool", "unknown_tool")
assert "HALLUCINATION ALERT" in caplog.text
assert "persistent_tool" in caplog.text

View File

@@ -0,0 +1,39 @@
"""Tests for binary_extensions helpers."""
from tools.binary_extensions import has_binary_extension, has_image_extension
def test_has_image_extension_png():
assert has_image_extension("/tmp/test.png") is True
assert has_image_extension("/tmp/test.PNG") is True
def test_has_image_extension_jpg_variants():
assert has_image_extension("/tmp/test.jpg") is True
assert has_image_extension("/tmp/test.jpeg") is True
assert has_image_extension("/tmp/test.JPG") is True
def test_has_image_extension_webp():
assert has_image_extension("/tmp/test.webp") is True
def test_has_image_extension_gif():
assert has_image_extension("/tmp/test.gif") is True
def test_has_image_extension_no_ext():
assert has_image_extension("/tmp/test") is False
def test_has_image_extension_non_image():
assert has_image_extension("/tmp/test.txt") is False
assert has_image_extension("/tmp/test.exe") is False
assert has_image_extension("/tmp/test.pdf") is False
def test_has_binary_extension_includes_images():
"""All image extensions must also be in binary extensions."""
assert has_binary_extension("/tmp/test.png") is True
assert has_binary_extension("/tmp/test.jpg") is True
assert has_binary_extension("/tmp/test.webp") is True

View File

@@ -294,3 +294,67 @@ class TestSearchHints:
class TestReadFileImageRouting:
"""Tests that image files are routed through vision analysis."""
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_png_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.png"
img.write_bytes(b"fake png data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_jpeg_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.jpeg"
img.write_bytes(b"fake jpeg data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_webp_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.webp"
img.write_bytes(b"fake webp data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
def test_non_image_binary_blocked(self, tmp_path):
from tools.file_tools import read_file_tool
exe = tmp_path / "test.exe"
exe.write_bytes(b"fake exe data")
result = json.loads(read_file_tool(str(exe)))
assert "error" in result
assert "Cannot read binary" in result["error"]
class TestAnalyzeImageWithVision:
"""Tests for the _analyze_image_with_vision helper."""
def test_import_error_fallback(self):
with patch.dict("sys.modules", {"tools.vision_tools": None}):
from tools.file_tools import _analyze_image_with_vision
result = json.loads(_analyze_image_with_vision("/tmp/test.png"))
assert "error" in result
assert "vision_analyze tool is not available" in result["error"]

View File

@@ -34,9 +34,22 @@ BINARY_EXTENSIONS = frozenset({
})
IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".webp", ".tiff", ".tif",
})
def has_binary_extension(path: str) -> bool:
"""Check if a file path has a binary extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in BINARY_EXTENSIONS
def has_image_extension(path: str) -> bool:
"""Check if a file path has an image extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in IMAGE_EXTENSIONS

View File

@@ -1893,11 +1893,13 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str:
"""
Take a screenshot of the current page and analyze it with vision AI.
This tool captures what's visually displayed in the browser and sends it
to Gemini for analysis. Useful for understanding visual content that the
text-based snapshot may not capture (CAPTCHAs, verification challenges,
images, complex layouts, etc.).
to the configured vision model for analysis. When the active model is
natively multimodal (e.g. Gemma 4) it is used directly; otherwise the
auxiliary vision backend is used. Useful for understanding visual content
that the text-based snapshot may not capture (CAPTCHAs, verification
challenges, images, complex layouts, etc.).
The screenshot is saved persistently and its file path is returned alongside
the analysis, so it can be shared with users via MEDIA:<path> in the response.

View File

@@ -7,7 +7,7 @@ import logging
import os
import threading
from pathlib import Path
from tools.binary_extensions import has_binary_extension
from tools.binary_extensions import has_binary_extension, has_image_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -279,6 +279,52 @@ def clear_file_ops_cache(task_id: str = None):
_file_ops_cache.clear()
def _analyze_image_with_vision(image_path: str, task_id: str = "default") -> str:
"""Route an image file through the vision analysis pipeline.
Uses vision_analyze_tool with a default descriptive prompt. Falls back
to a manual error when no vision backend is available.
"""
import asyncio
try:
from tools.vision_tools import vision_analyze_tool
except ImportError:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision_analyze tool "
"is not available. Use vision_analyze directly if configured."
),
})
prompt = (
"Describe this image in detail. If it contains text, transcribe "
"the text. If it is a diagram, chart, or UI screenshot, describe "
"the layout, colors, labels, and any visible data."
)
try:
result = asyncio.run(vision_analyze_tool(image_url=image_path, question=prompt))
except Exception as exc:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision analysis failed: {exc}. "
"Use vision_analyze directly if configured."
),
})
try:
parsed = json.loads(result)
except json.JSONDecodeError:
parsed = {"content": result}
# Wrap the vision result so the caller knows it came from image analysis
return json.dumps({
"image_path": image_path,
"analysis": parsed.get("content") or parsed.get("analysis") or result,
"source": "vision_analyze",
}, ensure_ascii=False)
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
@@ -295,10 +341,13 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
_resolved = Path(path).expanduser().resolve()
# ── Binary file guard ─────────────────────────────────────────
# Block binary files by extension (no I/O).
# ── Binary / image file guard ─────────────────────────────────
# Block binary files by extension (no I/O). Images are routed
# through the vision analysis pipeline when a backend is available.
if has_binary_extension(str(_resolved)):
_ext = _resolved.suffix.lower()
if has_image_extension(str(_resolved)):
return _analyze_image_with_vision(str(_resolved), task_id=task_id)
return json.dumps({
"error": (
f"Cannot read binary file '{path}' ({_ext}). "
@@ -729,7 +778,7 @@ def _check_file_reqs():
READ_FILE_SCHEMA = {
"name": "read_file",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Image files (PNG, JPEG, WebP, GIF, etc.) are automatically analyzed via vision_analyze. Other binary files cannot be read as text.",
"parameters": {
"type": "object",
"properties": {

View File

@@ -204,17 +204,6 @@ class ToolCallValidator:
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
# Log to persistent metrics
try:
from agent.hallucination_metrics import log_hallucination_event
log_hallucination_event(
tool_name=tool_name,
error_type="unknown_tool",
suggested_name=None,
)
except Exception:
pass # Best-effort metrics logging
if count >= self.failure_threshold:
logger.warning(
f"Poka-yoke circuit breaker triggered for '{tool_name}': "