Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
969ef22f99 feat: tighten tool hallucination circuit breaker for #836
All checks were successful
Lint / lint (pull_request) Successful in 8s
2026-04-22 03:10:06 -04:00
Alexander Whitestone
4674889c0f test: lock tool hallucination poka-yoke for #836 2026-04-22 03:09:57 -04:00
5 changed files with 43 additions and 232 deletions

View File

@@ -1302,9 +1302,9 @@ class TestConcurrentToolExecution:
mock_con.assert_not_called()
def test_malformed_json_args_forces_sequential(self, agent):
"""Non-dict tool arguments (e.g. JSON array) should fall back to sequential."""
"""Unparseable tool arguments should fall back to sequential."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='[1, 2, 3]', call_id="c2")
tc2 = _mock_tool_call(name="web_search", arguments="NOT JSON {{{", call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
@@ -1384,9 +1384,10 @@ class TestConcurrentToolExecution:
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
call_count = [0]
def fake_handle(name, args, task_id, **kwargs):
# Deterministic failure based on tool_call_id to avoid race conditions
if kwargs.get("tool_call_id") == "c1":
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("boom")
return "success"

View File

@@ -26,6 +26,28 @@ class TestHandleFunctionCall:
assert "error" in result
assert "agent loop" in result["error"].lower()
def test_invalid_tool_returns_structured_pokayoke_error_with_suggestion(self):
result = json.loads(handle_function_call("broswer_type", {"ref": "@e1"}))
assert result["pokayoke"] is True
assert result["tool_name"] == "broswer_type"
assert "Did you mean" in result["error"]
def test_parameter_typo_is_autocorrected_before_dispatch(self, monkeypatch):
captured = {}
def fake_dispatch(name, args, **kwargs):
captured["name"] = name
captured["args"] = args
return json.dumps({"ok": True})
monkeypatch.setattr("model_tools.registry.dispatch", fake_dispatch)
result = json.loads(handle_function_call("read_file", {"pathe": "test.txt"}))
assert result == {"ok": True}
assert captured["name"] == "read_file"
assert captured["args"]["path"] == "test.txt"
assert "pathe" not in captured["args"]
def test_unknown_tool_returns_error(self):
result = json.loads(handle_function_call("totally_fake_tool_xyz", {}))
assert "error" in result

View File

@@ -416,219 +416,3 @@ class TestEdgeCases:
"""Verify max workers constant exists and is reasonable."""
from run_agent import _MAX_TOOL_WORKERS
assert 1 <= _MAX_TOOL_WORKERS <= 32
# ── Integration Tests: AIAgent Concurrent Execution ───────────────────────────
class TestAIAgentConcurrentExecution:
"""Exercise _execute_tool_calls_concurrent through an AIAgent instance."""
@pytest.fixture
def agent(self):
"""Minimal AIAgent with mocked OpenAI client and tool loading."""
from types import SimpleNamespace
from unittest.mock import patch
from run_agent import AIAgent
def _make_tool_defs(*names):
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search", "read_file")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
return a
def _mock_assistant_msg(self, tool_calls=None):
from types import SimpleNamespace
return SimpleNamespace(content="", tool_calls=tool_calls)
def _mock_tool_call(self, name, arguments, call_id):
from types import SimpleNamespace
return SimpleNamespace(
id=call_id,
type="function",
function=SimpleNamespace(name=name, arguments=json.dumps(arguments)),
)
def test_two_tool_batch_executes_concurrently(self, agent):
"""2-tool parallel batch: all execute, results ordered, 100% pass."""
tc1 = self._mock_tool_call("read_file", {"path": "a.txt"}, "c1")
tc2 = self._mock_tool_call("read_file", {"path": "b.txt"}, "c2")
mock_msg = self._mock_assistant_msg(tool_calls=[tc1, tc2])
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"file": args.get("path", ""), "content": f"content_of_{args.get('path', '')}"})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
assert messages[0]["tool_call_id"] == "c1"
assert messages[1]["tool_call_id"] == "c2"
assert "a.txt" in messages[0]["content"]
assert "b.txt" in messages[1]["content"]
def test_three_tool_batch_executes_concurrently(self, agent):
"""3-tool parallel batch: all execute, results ordered, 100% pass."""
tcs = [
self._mock_tool_call("web_search", {"query": f"q{i}"}, f"c{i}")
for i in range(3)
]
mock_msg = self._mock_assistant_msg(tool_calls=tcs)
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"query": args.get("query", ""), "results": [f"result_{args.get('query', '')}"]})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 3
for i, tc in enumerate(tcs):
assert messages[i]["tool_call_id"] == tc.id
assert f"q{i}" in messages[i]["content"]
def test_four_tool_batch_executes_concurrently(self, agent):
"""4-tool parallel batch: all execute, results ordered, 100% pass."""
tcs = [
self._mock_tool_call("read_file", {"path": f"file{i}.txt"}, f"c{i}")
for i in range(4)
]
mock_msg = self._mock_assistant_msg(tool_calls=tcs)
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"path": args.get("path", ""), "size": 100})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 4
for i, tc in enumerate(tcs):
assert messages[i]["tool_call_id"] == tc.id
assert f"file{i}.txt" in messages[i]["content"]
def test_mixed_read_and_search_batch(self, agent):
"""read_file + search_files: safe parallel, different scopes."""
tc1 = self._mock_tool_call("read_file", {"path": "config.yaml"}, "c1")
tc2 = self._mock_tool_call("web_search", {"query": "provider"}, "c2")
mock_msg = self._mock_assistant_msg(tool_calls=[tc1, tc2])
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"tool": name, "args": args})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
assert messages[0]["tool_call_id"] == "c1"
assert messages[1]["tool_call_id"] == "c2"
assert "config.yaml" in messages[0]["content"]
assert "provider" in messages[1]["content"]
def test_concurrent_pass_rate_report(self, agent):
"""Simulate 2/3/4-tool batches and report pass rate."""
batch_sizes = [2, 3, 4]
pass_rates = {}
for size in batch_sizes:
tcs = [
self._mock_tool_call("web_search", {"query": f"q{i}"}, f"c{i}")
for i in range(size)
]
mock_msg = self._mock_assistant_msg(tool_calls=tcs)
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"ok": True, "query": args.get("query", "")})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
passed = sum(1 for m in messages if "ok" in m.get("content", ""))
pass_rates[size] = passed / size if size > 0 else 0.0
for size, rate in pass_rates.items():
assert rate == 1.0, f"Expected 100% pass rate for {size}-tool batch, got {rate:.0%}"
def test_gemma4_style_two_read_files(self, agent):
"""Gemma 4 may issue two reads simultaneously — verify both returned."""
tc1 = self._mock_tool_call("read_file", {"path": "src/main.py"}, "c1")
tc2 = self._mock_tool_call("read_file", {"path": "src/utils.py"}, "c2")
mock_msg = self._mock_assistant_msg(tool_calls=[tc1, tc2])
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"content": f"# {args['path']}\nprint('hello')"})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
assert "main.py" in messages[0]["content"]
assert "utils.py" in messages[1]["content"]
def test_gemma4_style_three_reads(self, agent):
"""Gemma 4 may issue 3 reads for different files — all returned."""
tcs = [
self._mock_tool_call("read_file", {"path": f"mod{i}.py"}, f"c{i}")
for i in range(3)
]
mock_msg = self._mock_assistant_msg(tool_calls=tcs)
messages = []
def fake_handle(name, args, task_id, **kwargs):
return json.dumps({"content": f"# {args['path']}"})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 3
for i in range(3):
assert f"mod{i}.py" in messages[i]["content"]
def test_mixed_safe_and_write_tools_parallel(self, agent):
"""Mix of read (safe) and write (path-scoped) on different paths — parallel."""
tc1 = self._mock_tool_call("read_file", {"path": "input.txt"}, "c1")
tc2 = self._mock_tool_call("write_file", {"path": "output.txt", "content": "x"}, "c2")
tc3 = self._mock_tool_call("read_file", {"path": "config.txt"}, "c3")
mock_msg = self._mock_assistant_msg(tool_calls=[tc1, tc2, tc3])
messages = []
call_order = []
def fake_handle(name, args, task_id, **kwargs):
call_order.append(name)
return json.dumps({"tool": name, "path": args.get("path", "")})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 3
# Results ordered by tool call ID, not completion order
assert messages[0]["tool_call_id"] == "c1"
assert messages[1]["tool_call_id"] == "c2"
assert messages[2]["tool_call_id"] == "c3"
# All three should have executed
assert len(call_order) == 3

View File

@@ -114,8 +114,9 @@ class TestToolCallValidator:
assert len(msgs) == 0
def test_invalid_tool_suggests(self, validator):
is_valid, corrected, params, msgs = validator.validate("browser_typo", {"ref": "@e1"})
is_valid, corrected, params, msgs = validator.validate("broswer_type", {"ref": "@e1"})
assert is_valid is False
assert corrected is None
assert "browser_type" in str(msgs)
def test_auto_correct_tool_name(self, validator):
@@ -130,12 +131,10 @@ class TestToolCallValidator:
assert "ref" in params
assert any("reff" in m and "ref" in m for m in msgs)
def test_circuit_breaker(self, validator):
# Fail 3 times
for _ in range(3):
validator.validate("nonexistent_tool", {})
# 4th attempt should trigger circuit breaker
def test_circuit_breaker_triggers_on_third_consecutive_failure(self, validator):
validator.validate("nonexistent_tool", {})
validator.validate("nonexistent_tool", {})
is_valid, corrected, params, msgs = validator.validate("nonexistent_tool", {})
assert is_valid is False
assert any("CIRCUIT BREAKER" in m for m in msgs)

View File

@@ -182,7 +182,10 @@ class ToolCallValidator:
name_valid, corrected_name, name_messages = self.validate_tool_name(tool_name)
if not name_valid:
self._record_failure(tool_name)
failure_count = self._record_failure(tool_name)
if failure_count >= self.failure_threshold:
_, _, breaker_messages = self.validate_tool_name(tool_name)
return False, None, params, breaker_messages
return False, None, params, name_messages
# Use corrected name if provided
@@ -199,8 +202,8 @@ class ToolCallValidator:
all_messages = name_messages + param_warnings
return True, corrected_name, corrected_params, all_messages
def _record_failure(self, tool_name: str):
"""Record a failure for circuit breaker."""
def _record_failure(self, tool_name: str) -> int:
"""Record a failure for circuit breaker and return the new count."""
self.consecutive_failures[tool_name] = self.consecutive_failures.get(tool_name, 0) + 1
count = self.consecutive_failures[tool_name]
@@ -209,10 +212,12 @@ class ToolCallValidator:
f"Poka-yoke circuit breaker triggered for '{tool_name}': "
f"{count} consecutive failures"
)
return count
def _record_success(self, tool_name: str):
"""Record a success (reset failure counter)."""
self.consecutive_failures.pop(tool_name, None)
"""Record a success (reset consecutive failure streaks)."""
if self.consecutive_failures:
self.consecutive_failures.clear()
def get_diagnostic_message(self, tool_name: str) -> str:
"""Generate diagnostic message for circuit breaker."""