Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
969ef22f99 feat: tighten tool hallucination circuit breaker for #836
All checks were successful
Lint / lint (pull_request) Successful in 8s
2026-04-22 03:10:06 -04:00
Alexander Whitestone
4674889c0f test: lock tool hallucination poka-yoke for #836 2026-04-22 03:09:57 -04:00
8 changed files with 46 additions and 187 deletions

View File

@@ -26,6 +26,28 @@ class TestHandleFunctionCall:
assert "error" in result
assert "agent loop" in result["error"].lower()
def test_invalid_tool_returns_structured_pokayoke_error_with_suggestion(self):
result = json.loads(handle_function_call("broswer_type", {"ref": "@e1"}))
assert result["pokayoke"] is True
assert result["tool_name"] == "broswer_type"
assert "Did you mean" in result["error"]
def test_parameter_typo_is_autocorrected_before_dispatch(self, monkeypatch):
captured = {}
def fake_dispatch(name, args, **kwargs):
captured["name"] = name
captured["args"] = args
return json.dumps({"ok": True})
monkeypatch.setattr("model_tools.registry.dispatch", fake_dispatch)
result = json.loads(handle_function_call("read_file", {"pathe": "test.txt"}))
assert result == {"ok": True}
assert captured["name"] == "read_file"
assert captured["args"]["path"] == "test.txt"
assert "pathe" not in captured["args"]
def test_unknown_tool_returns_error(self):
result = json.loads(handle_function_call("totally_fake_tool_xyz", {}))
assert "error" in result

View File

@@ -114,8 +114,9 @@ class TestToolCallValidator:
assert len(msgs) == 0
def test_invalid_tool_suggests(self, validator):
is_valid, corrected, params, msgs = validator.validate("browser_typo", {"ref": "@e1"})
is_valid, corrected, params, msgs = validator.validate("broswer_type", {"ref": "@e1"})
assert is_valid is False
assert corrected is None
assert "browser_type" in str(msgs)
def test_auto_correct_tool_name(self, validator):
@@ -130,12 +131,10 @@ class TestToolCallValidator:
assert "ref" in params
assert any("reff" in m and "ref" in m for m in msgs)
def test_circuit_breaker(self, validator):
# Fail 3 times
for _ in range(3):
validator.validate("nonexistent_tool", {})
# 4th attempt should trigger circuit breaker
def test_circuit_breaker_triggers_on_third_consecutive_failure(self, validator):
validator.validate("nonexistent_tool", {})
validator.validate("nonexistent_tool", {})
is_valid, corrected, params, msgs = validator.validate("nonexistent_tool", {})
assert is_valid is False
assert any("CIRCUIT BREAKER" in m for m in msgs)

View File

@@ -1,39 +0,0 @@
"""Tests for binary_extensions helpers."""
from tools.binary_extensions import has_binary_extension, has_image_extension
def test_has_image_extension_png():
assert has_image_extension("/tmp/test.png") is True
assert has_image_extension("/tmp/test.PNG") is True
def test_has_image_extension_jpg_variants():
assert has_image_extension("/tmp/test.jpg") is True
assert has_image_extension("/tmp/test.jpeg") is True
assert has_image_extension("/tmp/test.JPG") is True
def test_has_image_extension_webp():
assert has_image_extension("/tmp/test.webp") is True
def test_has_image_extension_gif():
assert has_image_extension("/tmp/test.gif") is True
def test_has_image_extension_no_ext():
assert has_image_extension("/tmp/test") is False
def test_has_image_extension_non_image():
assert has_image_extension("/tmp/test.txt") is False
assert has_image_extension("/tmp/test.exe") is False
assert has_image_extension("/tmp/test.pdf") is False
def test_has_binary_extension_includes_images():
"""All image extensions must also be in binary extensions."""
assert has_binary_extension("/tmp/test.png") is True
assert has_binary_extension("/tmp/test.jpg") is True
assert has_binary_extension("/tmp/test.webp") is True

View File

@@ -294,67 +294,3 @@ class TestSearchHints:
class TestReadFileImageRouting:
"""Tests that image files are routed through vision analysis."""
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_png_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.png"
img.write_bytes(b"fake png data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_jpeg_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.jpeg"
img.write_bytes(b"fake jpeg data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
@patch("tools.file_tools._analyze_image_with_vision")
def test_image_webp_routes_to_vision(self, mock_analyze, tmp_path):
mock_analyze.return_value = json.dumps({"analysis": "test image"})
img = tmp_path / "test.webp"
img.write_bytes(b"fake webp data")
from tools.file_tools import read_file_tool
result = read_file_tool(str(img))
mock_analyze.assert_called_once()
assert json.loads(result)["analysis"] == "test image"
def test_non_image_binary_blocked(self, tmp_path):
from tools.file_tools import read_file_tool
exe = tmp_path / "test.exe"
exe.write_bytes(b"fake exe data")
result = json.loads(read_file_tool(str(exe)))
assert "error" in result
assert "Cannot read binary" in result["error"]
class TestAnalyzeImageWithVision:
"""Tests for the _analyze_image_with_vision helper."""
def test_import_error_fallback(self):
with patch.dict("sys.modules", {"tools.vision_tools": None}):
from tools.file_tools import _analyze_image_with_vision
result = json.loads(_analyze_image_with_vision("/tmp/test.png"))
assert "error" in result
assert "vision_analyze tool is not available" in result["error"]

View File

@@ -34,22 +34,9 @@ BINARY_EXTENSIONS = frozenset({
})
IMAGE_EXTENSIONS = frozenset({
".png", ".jpg", ".jpeg", ".gif", ".bmp", ".ico", ".webp", ".tiff", ".tif",
})
def has_binary_extension(path: str) -> bool:
"""Check if a file path has a binary extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in BINARY_EXTENSIONS
def has_image_extension(path: str) -> bool:
"""Check if a file path has an image extension. Pure string check, no I/O."""
dot = path.rfind(".")
if dot == -1:
return False
return path[dot:].lower() in IMAGE_EXTENSIONS

View File

@@ -1893,13 +1893,11 @@ def browser_get_images(task_id: Optional[str] = None) -> str:
def browser_vision(question: str, annotate: bool = False, task_id: Optional[str] = None) -> str:
"""
Take a screenshot of the current page and analyze it with vision AI.
This tool captures what's visually displayed in the browser and sends it
to the configured vision model for analysis. When the active model is
natively multimodal (e.g. Gemma 4) it is used directly; otherwise the
auxiliary vision backend is used. Useful for understanding visual content
that the text-based snapshot may not capture (CAPTCHAs, verification
challenges, images, complex layouts, etc.).
to Gemini for analysis. Useful for understanding visual content that the
text-based snapshot may not capture (CAPTCHAs, verification challenges,
images, complex layouts, etc.).
The screenshot is saved persistently and its file path is returned alongside
the analysis, so it can be shared with users via MEDIA:<path> in the response.

View File

@@ -7,7 +7,7 @@ import logging
import os
import threading
from pathlib import Path
from tools.binary_extensions import has_binary_extension, has_image_extension
from tools.binary_extensions import has_binary_extension
from tools.file_operations import ShellFileOperations
from agent.redact import redact_sensitive_text
@@ -279,52 +279,6 @@ def clear_file_ops_cache(task_id: str = None):
_file_ops_cache.clear()
def _analyze_image_with_vision(image_path: str, task_id: str = "default") -> str:
"""Route an image file through the vision analysis pipeline.
Uses vision_analyze_tool with a default descriptive prompt. Falls back
to a manual error when no vision backend is available.
"""
import asyncio
try:
from tools.vision_tools import vision_analyze_tool
except ImportError:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision_analyze tool "
"is not available. Use vision_analyze directly if configured."
),
})
prompt = (
"Describe this image in detail. If it contains text, transcribe "
"the text. If it is a diagram, chart, or UI screenshot, describe "
"the layout, colors, labels, and any visible data."
)
try:
result = asyncio.run(vision_analyze_tool(image_url=image_path, question=prompt))
except Exception as exc:
return json.dumps({
"error": (
f"Image file '{image_path}' detected but vision analysis failed: {exc}. "
"Use vision_analyze directly if configured."
),
})
try:
parsed = json.loads(result)
except json.JSONDecodeError:
parsed = {"content": result}
# Wrap the vision result so the caller knows it came from image analysis
return json.dumps({
"image_path": image_path,
"analysis": parsed.get("content") or parsed.get("analysis") or result,
"source": "vision_analyze",
}, ensure_ascii=False)
def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str = "default") -> str:
"""Read a file with pagination and line numbers."""
try:
@@ -341,13 +295,10 @@ def read_file_tool(path: str, offset: int = 1, limit: int = 500, task_id: str =
_resolved = Path(path).expanduser().resolve()
# ── Binary / image file guard ─────────────────────────────────
# Block binary files by extension (no I/O). Images are routed
# through the vision analysis pipeline when a backend is available.
# ── Binary file guard ─────────────────────────────────────────
# Block binary files by extension (no I/O).
if has_binary_extension(str(_resolved)):
_ext = _resolved.suffix.lower()
if has_image_extension(str(_resolved)):
return _analyze_image_with_vision(str(_resolved), task_id=task_id)
return json.dumps({
"error": (
f"Cannot read binary file '{path}' ({_ext}). "
@@ -778,7 +729,7 @@ def _check_file_reqs():
READ_FILE_SCHEMA = {
"name": "read_file",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Image files (PNG, JPEG, WebP, GIF, etc.) are automatically analyzed via vision_analyze. Other binary files cannot be read as text.",
"description": "Read a text file with line numbers and pagination. Use this instead of cat/head/tail in terminal. Output format: 'LINE_NUM|CONTENT'. Suggests similar filenames if not found. Use offset and limit for large files. Reads exceeding ~100K characters are rejected; use offset and limit to read specific sections of large files. NOTE: Cannot read images or binary files — use vision_analyze for images.",
"parameters": {
"type": "object",
"properties": {

View File

@@ -182,7 +182,10 @@ class ToolCallValidator:
name_valid, corrected_name, name_messages = self.validate_tool_name(tool_name)
if not name_valid:
self._record_failure(tool_name)
failure_count = self._record_failure(tool_name)
if failure_count >= self.failure_threshold:
_, _, breaker_messages = self.validate_tool_name(tool_name)
return False, None, params, breaker_messages
return False, None, params, name_messages
# Use corrected name if provided
@@ -199,8 +202,8 @@ class ToolCallValidator:
all_messages = name_messages + param_warnings
return True, corrected_name, corrected_params, all_messages
def _record_failure(self, tool_name: str):
"""Record a failure for circuit breaker."""
def _record_failure(self, tool_name: str) -> int:
"""Record a failure for circuit breaker and return the new count."""
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
@@ -209,10 +212,12 @@ class ToolCallValidator:
f"Poka-yoke circuit breaker triggered for '{tool_name}': "
f"{count} consecutive failures"
)
return count
def _record_success(self, tool_name: str):
"""Record a success (reset failure counter)."""
self.consecutive_failures.pop(tool_name, None)
"""Record a success (reset consecutive failure streaks)."""
if self.consecutive_failures:
self.consecutive_failures.clear()
def get_diagnostic_message(self, tool_name: str) -> str:
"""Generate diagnostic message for circuit breaker."""