When finish_reason='length' and the response contains only reasoning (think blocks or empty content), the model exhausted its output token budget on thinking with nothing left for the actual response. Previously, this fell into either: - chat_completions: 3 useless continuation retries (model hits same limit) - anthropic/codex: generic 'Response truncated' error with rollback Now: detect the think-only + length condition early and return immediately with a targeted error message: 'Model used all output tokens on reasoning with none left for the response. Try lowering reasoning effort or increasing max_tokens.' This saves 2 wasted API calls on the chat_completions path and gives users actionable guidance instead of a cryptic error. The existing think-only retry logic (finish_reason='stop') is unchanged — that's a genuine model glitch where retrying can help.
3150 lines
127 KiB
Python
3150 lines
127 KiB
Python
"""Unit tests for run_agent.py (AIAgent).
|
|
|
|
Tests cover pure functions, state/structure methods, and conversation loop
|
|
pieces. The OpenAI client and tool loading are mocked so no network calls
|
|
are made.
|
|
"""
|
|
|
|
import json
|
|
import logging
|
|
import re
|
|
import uuid
|
|
from logging.handlers import RotatingFileHandler
|
|
from pathlib import Path
|
|
from types import SimpleNamespace
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
import pytest
|
|
|
|
import run_agent
|
|
from honcho_integration.client import HonchoClientConfig
|
|
from run_agent import AIAgent, _inject_honcho_turn_context
|
|
from agent.prompt_builder import DEFAULT_AGENT_IDENTITY
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Fixtures
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _make_tool_defs(*names: str) -> list:
|
|
"""Build minimal tool definition list accepted by AIAgent.__init__."""
|
|
return [
|
|
{
|
|
"type": "function",
|
|
"function": {
|
|
"name": n,
|
|
"description": f"{n} tool",
|
|
"parameters": {"type": "object", "properties": {}},
|
|
},
|
|
}
|
|
for n in names
|
|
]
|
|
|
|
|
|
@pytest.fixture()
|
|
def agent():
|
|
"""Minimal AIAgent with mocked OpenAI client and tool loading."""
|
|
with (
|
|
patch(
|
|
"run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")
|
|
),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
a.client = MagicMock()
|
|
return a
|
|
|
|
|
|
@pytest.fixture()
|
|
def agent_with_memory_tool():
|
|
"""Agent whose valid_tool_names includes 'memory'."""
|
|
with (
|
|
patch(
|
|
"run_agent.get_tool_definitions",
|
|
return_value=_make_tool_defs("web_search", "memory"),
|
|
),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-k...7890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
a.client = MagicMock()
|
|
return a
|
|
|
|
|
|
def test_aiagent_reuses_existing_errors_log_handler():
|
|
"""Repeated AIAgent init should not accumulate duplicate errors.log handlers."""
|
|
root_logger = logging.getLogger()
|
|
original_handlers = list(root_logger.handlers)
|
|
error_log_path = (run_agent._hermes_home / "logs" / "errors.log").resolve()
|
|
|
|
try:
|
|
for handler in list(root_logger.handlers):
|
|
root_logger.removeHandler(handler)
|
|
|
|
error_log_path.parent.mkdir(parents=True, exist_ok=True)
|
|
preexisting_handler = RotatingFileHandler(
|
|
error_log_path,
|
|
maxBytes=2 * 1024 * 1024,
|
|
backupCount=2,
|
|
)
|
|
root_logger.addHandler(preexisting_handler)
|
|
|
|
with (
|
|
patch(
|
|
"run_agent.get_tool_definitions",
|
|
return_value=_make_tool_defs("web_search"),
|
|
),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
AIAgent(
|
|
api_key="test-k...7890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
AIAgent(
|
|
api_key="test-k...7890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
|
|
matching_handlers = [
|
|
handler for handler in root_logger.handlers
|
|
if isinstance(handler, RotatingFileHandler)
|
|
and error_log_path == Path(handler.baseFilename).resolve()
|
|
]
|
|
assert len(matching_handlers) == 1
|
|
finally:
|
|
for handler in list(root_logger.handlers):
|
|
root_logger.removeHandler(handler)
|
|
if handler not in original_handlers:
|
|
handler.close()
|
|
for handler in original_handlers:
|
|
root_logger.addHandler(handler)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Helper to build mock assistant messages (API response objects)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
def _mock_assistant_msg(
|
|
content="Hello",
|
|
tool_calls=None,
|
|
reasoning=None,
|
|
reasoning_content=None,
|
|
reasoning_details=None,
|
|
):
|
|
"""Return a SimpleNamespace mimicking an OpenAI ChatCompletionMessage."""
|
|
msg = SimpleNamespace(content=content, tool_calls=tool_calls)
|
|
if reasoning is not None:
|
|
msg.reasoning = reasoning
|
|
if reasoning_content is not None:
|
|
msg.reasoning_content = reasoning_content
|
|
if reasoning_details is not None:
|
|
msg.reasoning_details = reasoning_details
|
|
return msg
|
|
|
|
|
|
def _mock_tool_call(name="web_search", arguments="{}", call_id=None):
|
|
"""Return a SimpleNamespace mimicking a tool call object."""
|
|
return SimpleNamespace(
|
|
id=call_id or f"call_{uuid.uuid4().hex[:8]}",
|
|
type="function",
|
|
function=SimpleNamespace(name=name, arguments=arguments),
|
|
)
|
|
|
|
|
|
def _mock_response(
|
|
content="Hello", finish_reason="stop", tool_calls=None, reasoning=None, usage=None
|
|
):
|
|
"""Return a SimpleNamespace mimicking an OpenAI ChatCompletion response."""
|
|
msg = _mock_assistant_msg(
|
|
content=content,
|
|
tool_calls=tool_calls,
|
|
reasoning=reasoning,
|
|
)
|
|
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
|
|
resp = SimpleNamespace(choices=[choice], model="test/model")
|
|
if usage:
|
|
resp.usage = SimpleNamespace(**usage)
|
|
else:
|
|
resp.usage = None
|
|
return resp
|
|
|
|
|
|
# ===================================================================
|
|
# Group 1: Pure Functions
|
|
# ===================================================================
|
|
|
|
|
|
class TestHasContentAfterThinkBlock:
|
|
def test_none_returns_false(self, agent):
|
|
assert agent._has_content_after_think_block(None) is False
|
|
|
|
def test_empty_returns_false(self, agent):
|
|
assert agent._has_content_after_think_block("") is False
|
|
|
|
def test_only_think_block_returns_false(self, agent):
|
|
assert agent._has_content_after_think_block("<think>reasoning</think>") is False
|
|
|
|
def test_content_after_think_returns_true(self, agent):
|
|
assert (
|
|
agent._has_content_after_think_block("<think>r</think> actual answer")
|
|
is True
|
|
)
|
|
|
|
def test_no_think_block_returns_true(self, agent):
|
|
assert agent._has_content_after_think_block("just normal content") is True
|
|
|
|
|
|
class TestStripThinkBlocks:
|
|
def test_none_returns_empty(self, agent):
|
|
assert agent._strip_think_blocks(None) == ""
|
|
|
|
def test_no_blocks_unchanged(self, agent):
|
|
assert agent._strip_think_blocks("hello world") == "hello world"
|
|
|
|
def test_single_block_removed(self, agent):
|
|
result = agent._strip_think_blocks("<think>reasoning</think> answer")
|
|
assert "reasoning" not in result
|
|
assert "answer" in result
|
|
|
|
def test_multiline_block_removed(self, agent):
|
|
text = "<think>\nline1\nline2\n</think>\nvisible"
|
|
result = agent._strip_think_blocks(text)
|
|
assert "line1" not in result
|
|
assert "visible" in result
|
|
|
|
|
|
class TestExtractReasoning:
|
|
def test_reasoning_field(self, agent):
|
|
msg = _mock_assistant_msg(reasoning="thinking hard")
|
|
assert agent._extract_reasoning(msg) == "thinking hard"
|
|
|
|
def test_reasoning_content_field(self, agent):
|
|
msg = _mock_assistant_msg(reasoning_content="deep thought")
|
|
assert agent._extract_reasoning(msg) == "deep thought"
|
|
|
|
def test_reasoning_details_array(self, agent):
|
|
msg = _mock_assistant_msg(
|
|
reasoning_details=[{"summary": "step-by-step analysis"}],
|
|
)
|
|
assert "step-by-step analysis" in agent._extract_reasoning(msg)
|
|
|
|
def test_no_reasoning_returns_none(self, agent):
|
|
msg = _mock_assistant_msg()
|
|
assert agent._extract_reasoning(msg) is None
|
|
|
|
def test_combined_reasoning(self, agent):
|
|
msg = _mock_assistant_msg(
|
|
reasoning="part1",
|
|
reasoning_content="part2",
|
|
)
|
|
result = agent._extract_reasoning(msg)
|
|
assert "part1" in result
|
|
assert "part2" in result
|
|
|
|
def test_deduplication(self, agent):
|
|
msg = _mock_assistant_msg(
|
|
reasoning="same text",
|
|
reasoning_content="same text",
|
|
)
|
|
result = agent._extract_reasoning(msg)
|
|
assert result == "same text"
|
|
|
|
@pytest.mark.parametrize(
|
|
("content", "expected"),
|
|
[
|
|
("<think>thinking hard</think>", "thinking hard"),
|
|
("<thinking>step by step</thinking>", "step by step"),
|
|
(
|
|
"<REASONING_SCRATCHPAD>scratch analysis</REASONING_SCRATCHPAD>",
|
|
"scratch analysis",
|
|
),
|
|
],
|
|
)
|
|
def test_inline_reasoning_blocks_fallback(self, agent, content, expected):
|
|
msg = _mock_assistant_msg(content=content)
|
|
assert agent._extract_reasoning(msg) == expected
|
|
|
|
|
|
class TestCleanSessionContent:
|
|
def test_none_passthrough(self):
|
|
assert AIAgent._clean_session_content(None) is None
|
|
|
|
def test_scratchpad_converted(self):
|
|
text = "<REASONING_SCRATCHPAD>think</REASONING_SCRATCHPAD> answer"
|
|
result = AIAgent._clean_session_content(text)
|
|
assert "<REASONING_SCRATCHPAD>" not in result
|
|
assert "<think>" in result
|
|
|
|
def test_extra_newlines_cleaned(self):
|
|
text = "\n\n\n<think>x</think>\n\n\nafter"
|
|
result = AIAgent._clean_session_content(text)
|
|
# Should not have excessive newlines around think block
|
|
assert "\n\n\n" not in result
|
|
# Content after think block must be preserved
|
|
assert "after" in result
|
|
|
|
|
|
class TestGetMessagesUpToLastAssistant:
|
|
def test_empty_list(self, agent):
|
|
assert agent._get_messages_up_to_last_assistant([]) == []
|
|
|
|
def test_no_assistant_returns_copy(self, agent):
|
|
msgs = [{"role": "user", "content": "hi"}]
|
|
result = agent._get_messages_up_to_last_assistant(msgs)
|
|
assert result == msgs
|
|
assert result is not msgs # should be a copy
|
|
|
|
def test_single_assistant(self, agent):
|
|
msgs = [
|
|
{"role": "user", "content": "hi"},
|
|
{"role": "assistant", "content": "hello"},
|
|
]
|
|
result = agent._get_messages_up_to_last_assistant(msgs)
|
|
assert len(result) == 1
|
|
assert result[0]["role"] == "user"
|
|
|
|
def test_multiple_assistants_returns_up_to_last(self, agent):
|
|
msgs = [
|
|
{"role": "user", "content": "q1"},
|
|
{"role": "assistant", "content": "a1"},
|
|
{"role": "user", "content": "q2"},
|
|
{"role": "assistant", "content": "a2"},
|
|
]
|
|
result = agent._get_messages_up_to_last_assistant(msgs)
|
|
assert len(result) == 3
|
|
assert result[-1]["content"] == "q2"
|
|
|
|
def test_assistant_then_tool_messages(self, agent):
|
|
msgs = [
|
|
{"role": "user", "content": "do something"},
|
|
{"role": "assistant", "content": "ok", "tool_calls": [{"id": "1"}]},
|
|
{"role": "tool", "content": "result", "tool_call_id": "1"},
|
|
]
|
|
# Last assistant is at index 1, so result = msgs[:1]
|
|
result = agent._get_messages_up_to_last_assistant(msgs)
|
|
assert len(result) == 1
|
|
assert result[0]["role"] == "user"
|
|
|
|
|
|
class TestMaskApiKey:
|
|
def test_none_returns_none(self, agent):
|
|
assert agent._mask_api_key_for_logs(None) is None
|
|
|
|
def test_short_key_returns_stars(self, agent):
|
|
assert agent._mask_api_key_for_logs("short") == "***"
|
|
|
|
def test_long_key_masked(self, agent):
|
|
key = "sk-or-v1-abcdefghijklmnop"
|
|
result = agent._mask_api_key_for_logs(key)
|
|
assert result.startswith("sk-or-v1")
|
|
assert result.endswith("mnop")
|
|
assert "..." in result
|
|
|
|
|
|
# ===================================================================
|
|
# Group 2: State / Structure Methods
|
|
# ===================================================================
|
|
|
|
|
|
class TestInit:
|
|
def test_anthropic_base_url_accepted(self):
|
|
"""Anthropic base URLs should route to native Anthropic client."""
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=[]),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter._anthropic_sdk") as mock_anthropic,
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
base_url="https://api.anthropic.com/v1/",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
assert agent.api_mode == "anthropic_messages"
|
|
mock_anthropic.Anthropic.assert_called_once()
|
|
|
|
def test_prompt_caching_claude_openrouter(self):
|
|
"""Claude model via OpenRouter should enable prompt caching."""
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=[]),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
model="anthropic/claude-sonnet-4-20250514",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
assert a._use_prompt_caching is True
|
|
|
|
def test_prompt_caching_non_claude(self):
|
|
"""Non-Claude model should disable prompt caching."""
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=[]),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
model="openai/gpt-4o",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
assert a._use_prompt_caching is False
|
|
|
|
def test_prompt_caching_non_openrouter(self):
|
|
"""Custom base_url (not OpenRouter) should disable prompt caching."""
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=[]),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
model="anthropic/claude-sonnet-4-20250514",
|
|
base_url="http://localhost:8080/v1",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
assert a._use_prompt_caching is False
|
|
|
|
def test_prompt_caching_native_anthropic(self):
|
|
"""Native Anthropic provider should enable prompt caching."""
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=[]),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter._anthropic_sdk"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
base_url="https://api.anthropic.com/v1/",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
assert a.api_mode == "anthropic_messages"
|
|
assert a._use_prompt_caching is True
|
|
|
|
def test_valid_tool_names_populated(self):
|
|
"""valid_tool_names should contain names from loaded tools."""
|
|
tools = _make_tool_defs("web_search", "terminal")
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=tools),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
assert a.valid_tool_names == {"web_search", "terminal"}
|
|
|
|
def test_session_id_auto_generated(self):
|
|
"""Session ID should be auto-generated in YYYYMMDD_HHMMSS_<hex6> format."""
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=[]),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
a = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
# Format: YYYYMMDD_HHMMSS_<6 hex chars>
|
|
assert re.match(r"^\d{8}_\d{6}_[0-9a-f]{6}$", a.session_id), (
|
|
f"session_id doesn't match expected format: {a.session_id}"
|
|
)
|
|
|
|
|
|
class TestInterrupt:
|
|
def test_interrupt_sets_flag(self, agent):
|
|
with patch("run_agent._set_interrupt"):
|
|
agent.interrupt()
|
|
assert agent._interrupt_requested is True
|
|
|
|
def test_interrupt_with_message(self, agent):
|
|
with patch("run_agent._set_interrupt"):
|
|
agent.interrupt("new question")
|
|
assert agent._interrupt_message == "new question"
|
|
|
|
def test_clear_interrupt(self, agent):
|
|
with patch("run_agent._set_interrupt"):
|
|
agent.interrupt("msg")
|
|
agent.clear_interrupt()
|
|
assert agent._interrupt_requested is False
|
|
assert agent._interrupt_message is None
|
|
|
|
def test_is_interrupted_property(self, agent):
|
|
assert agent.is_interrupted is False
|
|
with patch("run_agent._set_interrupt"):
|
|
agent.interrupt()
|
|
assert agent.is_interrupted is True
|
|
|
|
|
|
class TestHydrateTodoStore:
|
|
def test_no_todo_in_history(self, agent):
|
|
history = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi"},
|
|
]
|
|
with patch("run_agent._set_interrupt"):
|
|
agent._hydrate_todo_store(history)
|
|
assert not agent._todo_store.has_items()
|
|
|
|
def test_recovers_from_history(self, agent):
|
|
todos = [{"id": "1", "content": "do thing", "status": "pending"}]
|
|
history = [
|
|
{"role": "user", "content": "plan"},
|
|
{"role": "assistant", "content": "ok"},
|
|
{
|
|
"role": "tool",
|
|
"content": json.dumps({"todos": todos}),
|
|
"tool_call_id": "c1",
|
|
},
|
|
]
|
|
with patch("run_agent._set_interrupt"):
|
|
agent._hydrate_todo_store(history)
|
|
assert agent._todo_store.has_items()
|
|
|
|
def test_skips_non_todo_tools(self, agent):
|
|
history = [
|
|
{
|
|
"role": "tool",
|
|
"content": '{"result": "search done"}',
|
|
"tool_call_id": "c1",
|
|
},
|
|
]
|
|
with patch("run_agent._set_interrupt"):
|
|
agent._hydrate_todo_store(history)
|
|
assert not agent._todo_store.has_items()
|
|
|
|
def test_invalid_json_skipped(self, agent):
|
|
history = [
|
|
{
|
|
"role": "tool",
|
|
"content": 'not valid json "todos" oops',
|
|
"tool_call_id": "c1",
|
|
},
|
|
]
|
|
with patch("run_agent._set_interrupt"):
|
|
agent._hydrate_todo_store(history)
|
|
assert not agent._todo_store.has_items()
|
|
|
|
|
|
class TestBuildSystemPrompt:
|
|
def test_always_has_identity(self, agent):
|
|
prompt = agent._build_system_prompt()
|
|
assert DEFAULT_AGENT_IDENTITY in prompt
|
|
|
|
def test_includes_system_message(self, agent):
|
|
prompt = agent._build_system_prompt(system_message="Custom instruction")
|
|
assert "Custom instruction" in prompt
|
|
|
|
def test_memory_guidance_when_memory_tool_loaded(self, agent_with_memory_tool):
|
|
from agent.prompt_builder import MEMORY_GUIDANCE
|
|
|
|
prompt = agent_with_memory_tool._build_system_prompt()
|
|
assert MEMORY_GUIDANCE in prompt
|
|
|
|
def test_no_memory_guidance_without_tool(self, agent):
|
|
from agent.prompt_builder import MEMORY_GUIDANCE
|
|
|
|
prompt = agent._build_system_prompt()
|
|
assert MEMORY_GUIDANCE not in prompt
|
|
|
|
def test_includes_datetime(self, agent):
|
|
prompt = agent._build_system_prompt()
|
|
# Should contain current date info like "Conversation started:"
|
|
assert "Conversation started:" in prompt
|
|
|
|
def test_skills_prompt_derives_available_toolsets_from_loaded_tools(self):
|
|
tools = _make_tool_defs("web_search", "skills_list", "skill_view", "skill_manage")
|
|
toolset_map = {
|
|
"web_search": "web",
|
|
"skills_list": "skills",
|
|
"skill_view": "skills",
|
|
"skill_manage": "skills",
|
|
}
|
|
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=tools),
|
|
patch(
|
|
"run_agent.check_toolset_requirements",
|
|
side_effect=AssertionError("should not re-check toolset requirements"),
|
|
),
|
|
patch("run_agent.get_toolset_for_tool", create=True, side_effect=toolset_map.get),
|
|
patch("run_agent.build_skills_system_prompt", return_value="SKILLS_PROMPT") as mock_skills,
|
|
patch("run_agent.OpenAI"),
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-k...7890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
|
|
prompt = agent._build_system_prompt()
|
|
|
|
assert "SKILLS_PROMPT" in prompt
|
|
assert mock_skills.call_args.kwargs["available_tools"] == set(toolset_map)
|
|
assert mock_skills.call_args.kwargs["available_toolsets"] == {"web", "skills"}
|
|
|
|
|
|
class TestInvalidateSystemPrompt:
|
|
def test_clears_cache(self, agent):
|
|
agent._cached_system_prompt = "cached value"
|
|
agent._invalidate_system_prompt()
|
|
assert agent._cached_system_prompt is None
|
|
|
|
def test_reloads_memory_store(self, agent):
|
|
mock_store = MagicMock()
|
|
agent._memory_store = mock_store
|
|
agent._cached_system_prompt = "cached"
|
|
agent._invalidate_system_prompt()
|
|
mock_store.load_from_disk.assert_called_once()
|
|
|
|
|
|
class TestBuildApiKwargs:
|
|
def test_basic_kwargs(self, agent):
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["model"] == agent.model
|
|
assert kwargs["messages"] is messages
|
|
assert kwargs["timeout"] == 1800.0
|
|
|
|
def test_provider_preferences_injected(self, agent):
|
|
agent.providers_allowed = ["Anthropic"]
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["extra_body"]["provider"]["only"] == ["Anthropic"]
|
|
|
|
def test_reasoning_config_default_openrouter(self, agent):
|
|
"""Default reasoning config for OpenRouter should be medium."""
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
reasoning = kwargs["extra_body"]["reasoning"]
|
|
assert reasoning["enabled"] is True
|
|
assert reasoning["effort"] == "medium"
|
|
|
|
def test_reasoning_config_custom(self, agent):
|
|
agent.reasoning_config = {"enabled": False}
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["extra_body"]["reasoning"] == {"enabled": False}
|
|
|
|
def test_reasoning_not_sent_for_unsupported_openrouter_model(self, agent):
|
|
agent.model = "minimax/minimax-m2.5"
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert "reasoning" not in kwargs.get("extra_body", {})
|
|
|
|
def test_reasoning_sent_for_supported_openrouter_model(self, agent):
|
|
agent.model = "qwen/qwen3.5-plus-02-15"
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["extra_body"]["reasoning"]["effort"] == "medium"
|
|
|
|
def test_reasoning_sent_for_nous_route(self, agent):
|
|
agent.base_url = "https://inference-api.nousresearch.com/v1"
|
|
agent.model = "minimax/minimax-m2.5"
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["extra_body"]["reasoning"]["effort"] == "medium"
|
|
|
|
def test_reasoning_sent_for_copilot_gpt5(self, agent):
|
|
agent.base_url = "https://api.githubcopilot.com"
|
|
agent.model = "gpt-5.4"
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["extra_body"]["reasoning"] == {"effort": "medium"}
|
|
|
|
def test_reasoning_xhigh_normalized_for_copilot(self, agent):
|
|
agent.base_url = "https://api.githubcopilot.com"
|
|
agent.model = "gpt-5.4"
|
|
agent.reasoning_config = {"enabled": True, "effort": "xhigh"}
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["extra_body"]["reasoning"] == {"effort": "high"}
|
|
|
|
def test_reasoning_omitted_for_non_reasoning_copilot_model(self, agent):
|
|
agent.base_url = "https://api.githubcopilot.com"
|
|
agent.model = "gpt-4.1"
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert "reasoning" not in kwargs.get("extra_body", {})
|
|
|
|
def test_max_tokens_injected(self, agent):
|
|
agent.max_tokens = 4096
|
|
messages = [{"role": "user", "content": "hi"}]
|
|
kwargs = agent._build_api_kwargs(messages)
|
|
assert kwargs["max_tokens"] == 4096
|
|
|
|
|
|
class TestBuildAssistantMessage:
|
|
def test_basic_message(self, agent):
|
|
msg = _mock_assistant_msg(content="Hello!")
|
|
result = agent._build_assistant_message(msg, "stop")
|
|
assert result["role"] == "assistant"
|
|
assert result["content"] == "Hello!"
|
|
assert result["finish_reason"] == "stop"
|
|
|
|
def test_with_reasoning(self, agent):
|
|
msg = _mock_assistant_msg(content="answer", reasoning="thinking")
|
|
result = agent._build_assistant_message(msg, "stop")
|
|
assert result["reasoning"] == "thinking"
|
|
|
|
def test_with_tool_calls(self, agent):
|
|
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
|
|
msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
result = agent._build_assistant_message(msg, "tool_calls")
|
|
assert len(result["tool_calls"]) == 1
|
|
assert result["tool_calls"][0]["function"]["name"] == "web_search"
|
|
|
|
def test_with_reasoning_details(self, agent):
|
|
details = [{"type": "reasoning.summary", "text": "step1", "signature": "sig1"}]
|
|
msg = _mock_assistant_msg(content="ans", reasoning_details=details)
|
|
result = agent._build_assistant_message(msg, "stop")
|
|
assert "reasoning_details" in result
|
|
assert result["reasoning_details"][0]["text"] == "step1"
|
|
|
|
def test_empty_content(self, agent):
|
|
msg = _mock_assistant_msg(content=None)
|
|
result = agent._build_assistant_message(msg, "stop")
|
|
assert result["content"] == ""
|
|
|
|
def test_tool_call_extra_content_preserved(self, agent):
|
|
"""Gemini thinking models attach extra_content with thought_signature
|
|
to tool calls. This must be preserved so subsequent API calls include it."""
|
|
tc = _mock_tool_call(
|
|
name="get_weather", arguments='{"city":"NYC"}', call_id="c2"
|
|
)
|
|
tc.extra_content = {"google": {"thought_signature": "abc123"}}
|
|
msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
result = agent._build_assistant_message(msg, "tool_calls")
|
|
assert result["tool_calls"][0]["extra_content"] == {
|
|
"google": {"thought_signature": "abc123"}
|
|
}
|
|
|
|
def test_tool_call_without_extra_content(self, agent):
|
|
"""Standard tool calls (no thinking model) should not have extra_content."""
|
|
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c3")
|
|
msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
result = agent._build_assistant_message(msg, "tool_calls")
|
|
assert "extra_content" not in result["tool_calls"][0]
|
|
|
|
|
|
class TestFormatToolsForSystemMessage:
|
|
def test_no_tools_returns_empty_array(self, agent):
|
|
agent.tools = []
|
|
assert agent._format_tools_for_system_message() == "[]"
|
|
|
|
def test_formats_single_tool(self, agent):
|
|
agent.tools = _make_tool_defs("web_search")
|
|
result = agent._format_tools_for_system_message()
|
|
parsed = json.loads(result)
|
|
assert len(parsed) == 1
|
|
assert parsed[0]["name"] == "web_search"
|
|
|
|
def test_formats_multiple_tools(self, agent):
|
|
agent.tools = _make_tool_defs("web_search", "terminal", "read_file")
|
|
result = agent._format_tools_for_system_message()
|
|
parsed = json.loads(result)
|
|
assert len(parsed) == 3
|
|
names = {t["name"] for t in parsed}
|
|
assert names == {"web_search", "terminal", "read_file"}
|
|
|
|
|
|
# ===================================================================
|
|
# Group 3: Conversation Loop Pieces (OpenAI mock)
|
|
# ===================================================================
|
|
|
|
|
|
class TestExecuteToolCalls:
|
|
def test_single_tool_executed(self, agent):
|
|
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
messages = []
|
|
with patch(
|
|
"run_agent.handle_function_call", return_value="search result"
|
|
) as mock_hfc:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
# enabled_tools passes the agent's own valid_tool_names
|
|
args, kwargs = mock_hfc.call_args
|
|
assert args[:3] == ("web_search", {"q": "test"}, "task-1")
|
|
assert set(kwargs.get("enabled_tools", [])) == agent.valid_tool_names
|
|
assert len(messages) == 1
|
|
assert messages[0]["role"] == "tool"
|
|
assert "search result" in messages[0]["content"]
|
|
|
|
def test_interrupt_skips_remaining(self, agent):
|
|
tc1 = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments="{}", call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
|
|
with patch("run_agent._set_interrupt"):
|
|
agent.interrupt()
|
|
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
# Both calls should be skipped with cancellation messages
|
|
assert len(messages) == 2
|
|
assert (
|
|
"cancelled" in messages[0]["content"].lower()
|
|
or "interrupted" in messages[0]["content"].lower()
|
|
)
|
|
|
|
def test_invalid_json_args_defaults_empty(self, agent):
|
|
tc = _mock_tool_call(
|
|
name="web_search", arguments="not valid json", call_id="c1"
|
|
)
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
messages = []
|
|
with patch("run_agent.handle_function_call", return_value="ok") as mock_hfc:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
# Invalid JSON args should fall back to empty dict
|
|
args, kwargs = mock_hfc.call_args
|
|
assert args[:3] == ("web_search", {}, "task-1")
|
|
assert set(kwargs.get("enabled_tools", [])) == agent.valid_tool_names
|
|
assert len(messages) == 1
|
|
assert messages[0]["role"] == "tool"
|
|
assert messages[0]["tool_call_id"] == "c1"
|
|
|
|
def test_result_truncation_over_100k(self, agent):
|
|
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
messages = []
|
|
big_result = "x" * 150_000
|
|
with patch("run_agent.handle_function_call", return_value=big_result):
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
# Content should be truncated
|
|
assert len(messages[0]["content"]) < 150_000
|
|
assert "Truncated" in messages[0]["content"]
|
|
|
|
|
|
class TestConcurrentToolExecution:
|
|
"""Tests for _execute_tool_calls_concurrent and dispatch logic."""
|
|
|
|
def test_single_tool_uses_sequential_path(self, agent):
|
|
"""Single tool call should use sequential path, not concurrent."""
|
|
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_clarify_forces_sequential(self, agent):
|
|
"""Batch containing clarify should use sequential path."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="clarify", arguments='{"question":"ok?"}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_multiple_tools_uses_concurrent_path(self, agent):
|
|
"""Multiple read-only tools should use concurrent path."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_con.assert_called_once()
|
|
mock_seq.assert_not_called()
|
|
|
|
def test_terminal_batch_forces_sequential(self, agent):
|
|
"""Stateful tools should not share the concurrent execution path."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="terminal", arguments='{"command":"pwd"}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_write_batch_forces_sequential(self, agent):
|
|
"""File mutations should stay ordered within a turn."""
|
|
tc1 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="write_file", arguments='{"path":"x.py","content":"print(1)"}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_disjoint_write_batch_uses_concurrent_path(self, agent):
|
|
"""Independent file writes should still run concurrently."""
|
|
tc1 = _mock_tool_call(
|
|
name="write_file",
|
|
arguments='{"path":"src/a.py","content":"print(1)"}',
|
|
call_id="c1",
|
|
)
|
|
tc2 = _mock_tool_call(
|
|
name="write_file",
|
|
arguments='{"path":"src/b.py","content":"print(2)"}',
|
|
call_id="c2",
|
|
)
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_con.assert_called_once()
|
|
mock_seq.assert_not_called()
|
|
|
|
def test_overlapping_write_batch_forces_sequential(self, agent):
|
|
"""Writes to the same file must stay ordered."""
|
|
tc1 = _mock_tool_call(
|
|
name="write_file",
|
|
arguments='{"path":"src/a.py","content":"print(1)"}',
|
|
call_id="c1",
|
|
)
|
|
tc2 = _mock_tool_call(
|
|
name="patch",
|
|
arguments='{"path":"src/a.py","old_string":"1","new_string":"2"}',
|
|
call_id="c2",
|
|
)
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_malformed_json_args_forces_sequential(self, agent):
|
|
"""Unparseable tool arguments should fall back to sequential."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments="NOT JSON {{{", call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_non_dict_args_forces_sequential(self, agent):
|
|
"""Tool arguments that parse to a non-dict type should fall back to sequential."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments='"just a string"', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
|
|
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
|
|
agent._execute_tool_calls(mock_msg, messages, "task-1")
|
|
mock_seq.assert_called_once()
|
|
mock_con.assert_not_called()
|
|
|
|
def test_concurrent_executes_all_tools(self, agent):
|
|
"""Concurrent path should execute all tools and append results in order."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"alpha"}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments='{"q":"beta"}', call_id="c2")
|
|
tc3 = _mock_tool_call(name="web_search", arguments='{"q":"gamma"}', call_id="c3")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2, tc3])
|
|
messages = []
|
|
|
|
call_log = []
|
|
|
|
def fake_handle(name, args, task_id, **kwargs):
|
|
call_log.append(name)
|
|
return json.dumps({"result": args.get("q", "")})
|
|
|
|
with patch("run_agent.handle_function_call", side_effect=fake_handle):
|
|
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
|
|
|
|
assert len(messages) == 3
|
|
# Results must be in original order
|
|
assert messages[0]["tool_call_id"] == "c1"
|
|
assert messages[1]["tool_call_id"] == "c2"
|
|
assert messages[2]["tool_call_id"] == "c3"
|
|
# All should be tool messages
|
|
assert all(m["role"] == "tool" for m in messages)
|
|
# Content should contain the query results
|
|
assert "alpha" in messages[0]["content"]
|
|
assert "beta" in messages[1]["content"]
|
|
assert "gamma" in messages[2]["content"]
|
|
|
|
def test_concurrent_preserves_order_despite_timing(self, agent):
|
|
"""Even if tools finish in different order, messages should be in original order."""
|
|
import time as _time
|
|
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"slow"}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments='{"q":"fast"}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
|
|
def fake_handle(name, args, task_id, **kwargs):
|
|
q = args.get("q", "")
|
|
if q == "slow":
|
|
_time.sleep(0.1) # Slow tool
|
|
return f"result_{q}"
|
|
|
|
with patch("run_agent.handle_function_call", side_effect=fake_handle):
|
|
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
|
|
|
|
assert messages[0]["tool_call_id"] == "c1"
|
|
assert "result_slow" in messages[0]["content"]
|
|
assert messages[1]["tool_call_id"] == "c2"
|
|
assert "result_fast" in messages[1]["content"]
|
|
|
|
def test_concurrent_handles_tool_error(self, agent):
|
|
"""If one tool raises, others should still complete."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments='{}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
|
|
call_count = [0]
|
|
def fake_handle(name, args, task_id, **kwargs):
|
|
call_count[0] += 1
|
|
if call_count[0] == 1:
|
|
raise RuntimeError("boom")
|
|
return "success"
|
|
|
|
with patch("run_agent.handle_function_call", side_effect=fake_handle):
|
|
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
|
|
|
|
assert len(messages) == 2
|
|
# First tool should have error
|
|
assert "Error" in messages[0]["content"] or "boom" in messages[0]["content"]
|
|
# Second tool should succeed
|
|
assert "success" in messages[1]["content"]
|
|
|
|
def test_concurrent_interrupt_before_start(self, agent):
|
|
"""If interrupt is requested before concurrent execution, all tools are skipped."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="read_file", arguments='{}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
|
|
with patch("run_agent._set_interrupt"):
|
|
agent.interrupt()
|
|
|
|
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
|
|
assert len(messages) == 2
|
|
assert "cancelled" in messages[0]["content"].lower() or "skipped" in messages[0]["content"].lower()
|
|
assert "cancelled" in messages[1]["content"].lower() or "skipped" in messages[1]["content"].lower()
|
|
|
|
def test_concurrent_truncates_large_results(self, agent):
|
|
"""Concurrent path should truncate results over 100k chars."""
|
|
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
|
|
tc2 = _mock_tool_call(name="web_search", arguments='{}', call_id="c2")
|
|
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
|
|
messages = []
|
|
big_result = "x" * 150_000
|
|
|
|
with patch("run_agent.handle_function_call", return_value=big_result):
|
|
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
|
|
|
|
assert len(messages) == 2
|
|
for m in messages:
|
|
assert len(m["content"]) < 150_000
|
|
assert "Truncated" in m["content"]
|
|
|
|
def test_invoke_tool_dispatches_to_handle_function_call(self, agent):
|
|
"""_invoke_tool should route regular tools through handle_function_call."""
|
|
with patch("run_agent.handle_function_call", return_value="result") as mock_hfc:
|
|
result = agent._invoke_tool("web_search", {"q": "test"}, "task-1")
|
|
mock_hfc.assert_called_once_with(
|
|
"web_search", {"q": "test"}, "task-1",
|
|
enabled_tools=list(agent.valid_tool_names),
|
|
honcho_manager=None,
|
|
honcho_session_key=None,
|
|
)
|
|
assert result == "result"
|
|
|
|
def test_invoke_tool_handles_agent_level_tools(self, agent):
|
|
"""_invoke_tool should handle todo tool directly."""
|
|
with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo:
|
|
result = agent._invoke_tool("todo", {"todos": []}, "task-1")
|
|
mock_todo.assert_called_once()
|
|
assert "ok" in result
|
|
|
|
|
|
class TestPathsOverlap:
|
|
"""Unit tests for the _paths_overlap helper."""
|
|
|
|
def test_same_path_overlaps(self):
|
|
from run_agent import _paths_overlap
|
|
assert _paths_overlap(Path("src/a.py"), Path("src/a.py"))
|
|
|
|
def test_siblings_do_not_overlap(self):
|
|
from run_agent import _paths_overlap
|
|
assert not _paths_overlap(Path("src/a.py"), Path("src/b.py"))
|
|
|
|
def test_parent_child_overlap(self):
|
|
from run_agent import _paths_overlap
|
|
assert _paths_overlap(Path("src"), Path("src/sub/a.py"))
|
|
|
|
def test_different_roots_do_not_overlap(self):
|
|
from run_agent import _paths_overlap
|
|
assert not _paths_overlap(Path("src/a.py"), Path("other/a.py"))
|
|
|
|
def test_nested_vs_flat_do_not_overlap(self):
|
|
from run_agent import _paths_overlap
|
|
assert not _paths_overlap(Path("src/sub/a.py"), Path("src/a.py"))
|
|
|
|
def test_empty_paths_do_not_overlap(self):
|
|
from run_agent import _paths_overlap
|
|
assert not _paths_overlap(Path(""), Path(""))
|
|
|
|
def test_one_empty_path_does_not_overlap(self):
|
|
from run_agent import _paths_overlap
|
|
assert not _paths_overlap(Path(""), Path("src/a.py"))
|
|
assert not _paths_overlap(Path("src/a.py"), Path(""))
|
|
|
|
|
|
class TestHandleMaxIterations:
|
|
def test_returns_summary(self, agent):
|
|
resp = _mock_response(content="Here is a summary of what I did.")
|
|
agent.client.chat.completions.create.return_value = resp
|
|
agent._cached_system_prompt = "You are helpful."
|
|
messages = [{"role": "user", "content": "do stuff"}]
|
|
result = agent._handle_max_iterations(messages, 60)
|
|
assert isinstance(result, str)
|
|
assert len(result) > 0
|
|
assert "summary" in result.lower()
|
|
|
|
def test_api_failure_returns_error(self, agent):
|
|
agent.client.chat.completions.create.side_effect = Exception("API down")
|
|
agent._cached_system_prompt = "You are helpful."
|
|
messages = [{"role": "user", "content": "do stuff"}]
|
|
result = agent._handle_max_iterations(messages, 60)
|
|
assert isinstance(result, str)
|
|
assert "error" in result.lower()
|
|
assert "API down" in result
|
|
|
|
def test_summary_skips_reasoning_for_unsupported_openrouter_model(self, agent):
|
|
agent.model = "minimax/minimax-m2.5"
|
|
resp = _mock_response(content="Summary")
|
|
agent.client.chat.completions.create.return_value = resp
|
|
agent._cached_system_prompt = "You are helpful."
|
|
messages = [{"role": "user", "content": "do stuff"}]
|
|
|
|
result = agent._handle_max_iterations(messages, 60)
|
|
|
|
assert result == "Summary"
|
|
kwargs = agent.client.chat.completions.create.call_args.kwargs
|
|
assert "reasoning" not in kwargs.get("extra_body", {})
|
|
|
|
|
|
class TestRunConversation:
|
|
"""Tests for the main run_conversation method.
|
|
|
|
Each test mocks client.chat.completions.create to return controlled
|
|
responses, exercising different code paths without real API calls.
|
|
"""
|
|
|
|
def _setup_agent(self, agent):
|
|
"""Common setup for run_conversation tests."""
|
|
agent._cached_system_prompt = "You are helpful."
|
|
agent._use_prompt_caching = False
|
|
agent.tool_delay = 0
|
|
agent.compression_enabled = False
|
|
agent.save_trajectories = False
|
|
|
|
def test_stop_finish_reason_returns_response(self, agent):
|
|
self._setup_agent(agent)
|
|
resp = _mock_response(content="Final answer", finish_reason="stop")
|
|
agent.client.chat.completions.create.return_value = resp
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
assert result["final_response"] == "Final answer"
|
|
assert result["completed"] is True
|
|
|
|
def test_tool_calls_then_stop(self, agent):
|
|
self._setup_agent(agent)
|
|
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
|
|
resp1 = _mock_response(content="", finish_reason="tool_calls", tool_calls=[tc])
|
|
resp2 = _mock_response(content="Done searching", finish_reason="stop")
|
|
agent.client.chat.completions.create.side_effect = [resp1, resp2]
|
|
with (
|
|
patch("run_agent.handle_function_call", return_value="search result"),
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("search something")
|
|
assert result["final_response"] == "Done searching"
|
|
assert result["api_calls"] == 2
|
|
|
|
def test_interrupt_breaks_loop(self, agent):
|
|
self._setup_agent(agent)
|
|
|
|
def interrupt_side_effect(api_kwargs):
|
|
agent._interrupt_requested = True
|
|
raise InterruptedError("Agent interrupted during API call")
|
|
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
patch("run_agent._set_interrupt"),
|
|
patch.object(
|
|
agent, "_interruptible_api_call", side_effect=interrupt_side_effect
|
|
),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
assert result["interrupted"] is True
|
|
|
|
def test_invalid_tool_name_retry(self, agent):
|
|
"""Model hallucinates an invalid tool name, agent retries and succeeds."""
|
|
self._setup_agent(agent)
|
|
bad_tc = _mock_tool_call(name="nonexistent_tool", arguments="{}", call_id="c1")
|
|
resp_bad = _mock_response(
|
|
content="", finish_reason="tool_calls", tool_calls=[bad_tc]
|
|
)
|
|
resp_good = _mock_response(content="Got it", finish_reason="stop")
|
|
agent.client.chat.completions.create.side_effect = [resp_bad, resp_good]
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("do something")
|
|
assert result["final_response"] == "Got it"
|
|
assert result["completed"] is True
|
|
assert result["api_calls"] == 2
|
|
|
|
def test_empty_content_retry_uses_inline_reasoning_as_response(self, agent):
|
|
"""Reasoning-only payloads should recover the inline reasoning text."""
|
|
self._setup_agent(agent)
|
|
empty_resp = _mock_response(
|
|
content="<think>internal reasoning</think>",
|
|
finish_reason="stop",
|
|
)
|
|
# Return empty 3 times to exhaust retries
|
|
agent.client.chat.completions.create.side_effect = [
|
|
empty_resp,
|
|
empty_resp,
|
|
empty_resp,
|
|
]
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("answer me")
|
|
assert result["completed"] is True
|
|
assert result["final_response"] == "internal reasoning"
|
|
|
|
def test_nous_401_refreshes_after_remint_and_retries(self, agent):
|
|
self._setup_agent(agent)
|
|
agent.provider = "nous"
|
|
agent.api_mode = "chat_completions"
|
|
|
|
calls = {"api": 0, "refresh": 0}
|
|
|
|
class _UnauthorizedError(RuntimeError):
|
|
def __init__(self):
|
|
super().__init__("Error code: 401 - unauthorized")
|
|
self.status_code = 401
|
|
|
|
def _fake_api_call(api_kwargs):
|
|
calls["api"] += 1
|
|
if calls["api"] == 1:
|
|
raise _UnauthorizedError()
|
|
return _mock_response(
|
|
content="Recovered after remint", finish_reason="stop"
|
|
)
|
|
|
|
def _fake_refresh(*, force=True):
|
|
calls["refresh"] += 1
|
|
assert force is True
|
|
return True
|
|
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
|
|
patch.object(
|
|
agent, "_try_refresh_nous_client_credentials", side_effect=_fake_refresh
|
|
),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
|
|
assert calls["api"] == 2
|
|
assert calls["refresh"] == 1
|
|
assert result["completed"] is True
|
|
assert result["final_response"] == "Recovered after remint"
|
|
|
|
def test_context_compression_triggered(self, agent):
|
|
"""When compressor says should_compress, compression runs."""
|
|
self._setup_agent(agent)
|
|
agent.compression_enabled = True
|
|
|
|
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
|
|
resp1 = _mock_response(content="", finish_reason="tool_calls", tool_calls=[tc])
|
|
resp2 = _mock_response(content="All done", finish_reason="stop")
|
|
agent.client.chat.completions.create.side_effect = [resp1, resp2]
|
|
|
|
with (
|
|
patch("run_agent.handle_function_call", return_value="result"),
|
|
patch.object(
|
|
agent.context_compressor, "should_compress", return_value=True
|
|
),
|
|
patch.object(agent, "_compress_context") as mock_compress,
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
# _compress_context should return (messages, system_prompt)
|
|
mock_compress.return_value = (
|
|
[{"role": "user", "content": "search something"}],
|
|
"compressed system prompt",
|
|
)
|
|
result = agent.run_conversation("search something")
|
|
mock_compress.assert_called_once()
|
|
assert result["final_response"] == "All done"
|
|
assert result["completed"] is True
|
|
|
|
def test_glm_prompt_exceeds_max_length_triggers_compression(self, agent):
|
|
"""GLM/Z.AI uses 'Prompt exceeds max length' for context overflow."""
|
|
self._setup_agent(agent)
|
|
err_400 = Exception(
|
|
"Error code: 400 - {'error': {'code': '1261', 'message': 'Prompt exceeds max length'}}"
|
|
)
|
|
err_400.status_code = 400
|
|
ok_resp = _mock_response(content="Recovered after compression", finish_reason="stop")
|
|
agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
|
|
prefill = [
|
|
{"role": "user", "content": "previous question"},
|
|
{"role": "assistant", "content": "previous answer"},
|
|
]
|
|
|
|
with (
|
|
patch.object(agent, "_compress_context") as mock_compress,
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
mock_compress.return_value = (
|
|
[{"role": "user", "content": "hello"}],
|
|
"compressed system prompt",
|
|
)
|
|
result = agent.run_conversation("hello", conversation_history=prefill)
|
|
|
|
mock_compress.assert_called_once()
|
|
assert result["final_response"] == "Recovered after compression"
|
|
assert result["completed"] is True
|
|
|
|
def test_length_finish_reason_requests_continuation(self, agent):
|
|
"""Normal truncation (partial real content) triggers continuation."""
|
|
self._setup_agent(agent)
|
|
first = _mock_response(content="Part 1 ", finish_reason="length")
|
|
second = _mock_response(content="Part 2", finish_reason="stop")
|
|
agent.client.chat.completions.create.side_effect = [first, second]
|
|
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
|
|
assert result["completed"] is True
|
|
assert result["api_calls"] == 2
|
|
assert result["final_response"] == "Part 1 Part 2"
|
|
|
|
second_call_messages = agent.client.chat.completions.create.call_args_list[1].kwargs["messages"]
|
|
assert second_call_messages[-1]["role"] == "user"
|
|
assert "truncated by the output length limit" in second_call_messages[-1]["content"]
|
|
|
|
def test_length_thinking_exhausted_skips_continuation(self, agent):
|
|
"""When finish_reason='length' but content is only thinking, skip retries."""
|
|
self._setup_agent(agent)
|
|
resp = _mock_response(
|
|
content="<think>internal reasoning</think>",
|
|
finish_reason="length",
|
|
)
|
|
agent.client.chat.completions.create.return_value = resp
|
|
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
|
|
# Should return immediately — no continuation, only 1 API call
|
|
assert result["completed"] is False
|
|
assert result["api_calls"] == 1
|
|
assert "reasoning" in result["error"].lower()
|
|
assert "output tokens" in result["error"].lower()
|
|
# Should have a user-friendly response (not None)
|
|
assert result["final_response"] is not None
|
|
assert "Thinking Budget Exhausted" in result["final_response"]
|
|
assert "/thinkon" in result["final_response"]
|
|
|
|
def test_length_empty_content_detected_as_thinking_exhausted(self, agent):
|
|
"""When finish_reason='length' and content is None/empty, detect exhaustion."""
|
|
self._setup_agent(agent)
|
|
resp = _mock_response(content=None, finish_reason="length")
|
|
agent.client.chat.completions.create.return_value = resp
|
|
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
|
|
assert result["completed"] is False
|
|
assert result["api_calls"] == 1
|
|
assert "reasoning" in result["error"].lower()
|
|
# User-friendly message is returned
|
|
assert result["final_response"] is not None
|
|
assert "Thinking Budget Exhausted" in result["final_response"]
|
|
|
|
|
|
class TestRetryExhaustion:
|
|
"""Regression: retry_count > max_retries was dead code (off-by-one).
|
|
|
|
When retries were exhausted the condition never triggered, causing
|
|
the loop to exit and fall through to response.choices[0] on an
|
|
invalid response, raising IndexError.
|
|
"""
|
|
|
|
def _setup_agent(self, agent):
|
|
agent._cached_system_prompt = "You are helpful."
|
|
agent._use_prompt_caching = False
|
|
agent.tool_delay = 0
|
|
agent.compression_enabled = False
|
|
agent.save_trajectories = False
|
|
|
|
@staticmethod
|
|
def _make_fast_time_mock():
|
|
"""Return a mock time module where sleep loops exit instantly."""
|
|
mock_time = MagicMock()
|
|
_t = [1000.0]
|
|
|
|
def _advancing_time():
|
|
_t[0] += 500.0 # jump 500s per call so sleep_end is always in the past
|
|
return _t[0]
|
|
|
|
mock_time.time.side_effect = _advancing_time
|
|
mock_time.sleep = MagicMock() # no-op
|
|
mock_time.monotonic.return_value = 12345.0
|
|
return mock_time
|
|
|
|
def test_invalid_response_returns_error_not_crash(self, agent):
|
|
"""Exhausted retries on invalid (empty choices) response must not IndexError."""
|
|
self._setup_agent(agent)
|
|
# Return response with empty choices every time
|
|
bad_resp = SimpleNamespace(
|
|
choices=[],
|
|
model="test/model",
|
|
usage=None,
|
|
)
|
|
agent.client.chat.completions.create.return_value = bad_resp
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
patch("run_agent.time", self._make_fast_time_mock()),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
assert result.get("completed") is False, (
|
|
f"Expected completed=False, got: {result}"
|
|
)
|
|
assert result.get("failed") is True
|
|
assert "error" in result
|
|
assert "Invalid API response" in result["error"]
|
|
|
|
def test_api_error_returns_gracefully_after_retries(self, agent):
|
|
"""Exhausted retries on API errors must return error result, not crash."""
|
|
self._setup_agent(agent)
|
|
agent.client.chat.completions.create.side_effect = RuntimeError("rate limited")
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
patch("run_agent.time", self._make_fast_time_mock()),
|
|
):
|
|
result = agent.run_conversation("hello")
|
|
assert result.get("completed") is False
|
|
assert result.get("failed") is True
|
|
assert "error" in result
|
|
assert "rate limited" in result["error"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Flush sentinel leak
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestFlushSentinelNotLeaked:
|
|
"""_flush_sentinel must be stripped before sending messages to the API."""
|
|
|
|
def test_flush_sentinel_stripped_from_api_messages(self, agent_with_memory_tool):
|
|
"""Verify _flush_sentinel is not sent to the API provider."""
|
|
agent = agent_with_memory_tool
|
|
agent._memory_store = MagicMock()
|
|
agent._memory_flush_min_turns = 1
|
|
agent._user_turn_count = 10
|
|
agent._cached_system_prompt = "system"
|
|
|
|
messages = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi"},
|
|
{"role": "user", "content": "remember this"},
|
|
]
|
|
|
|
# Mock the API to return a simple response (no tool calls)
|
|
mock_msg = SimpleNamespace(content="OK", tool_calls=None)
|
|
mock_choice = SimpleNamespace(message=mock_msg)
|
|
mock_response = SimpleNamespace(choices=[mock_choice])
|
|
agent.client.chat.completions.create.return_value = mock_response
|
|
|
|
# Bypass auxiliary client so flush uses agent.client directly
|
|
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")):
|
|
agent.flush_memories(messages, min_turns=0)
|
|
|
|
# Check what was actually sent to the API
|
|
call_args = agent.client.chat.completions.create.call_args
|
|
assert call_args is not None, "flush_memories never called the API"
|
|
api_messages = call_args.kwargs.get("messages") or call_args[1].get("messages")
|
|
for msg in api_messages:
|
|
assert "_flush_sentinel" not in msg, (
|
|
f"_flush_sentinel leaked to API in message: {msg}"
|
|
)
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Conversation history mutation
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestConversationHistoryNotMutated:
|
|
"""run_conversation must not mutate the caller's conversation_history list."""
|
|
|
|
def test_caller_list_unchanged_after_run(self, agent):
|
|
"""Passing conversation_history should not modify the original list."""
|
|
history = [
|
|
{"role": "user", "content": "previous question"},
|
|
{"role": "assistant", "content": "previous answer"},
|
|
]
|
|
original_len = len(history)
|
|
|
|
resp = _mock_response(content="new answer", finish_reason="stop")
|
|
agent.client.chat.completions.create.return_value = resp
|
|
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
result = agent.run_conversation(
|
|
"new question", conversation_history=history
|
|
)
|
|
|
|
# Caller's list must be untouched
|
|
assert len(history) == original_len, (
|
|
f"conversation_history was mutated: expected {original_len} items, got {len(history)}"
|
|
)
|
|
# Result should have more messages than the original history
|
|
assert len(result["messages"]) > original_len
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _max_tokens_param consistency
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestNousCredentialRefresh:
|
|
"""Verify Nous credential refresh rebuilds the runtime client."""
|
|
|
|
def test_try_refresh_nous_client_credentials_rebuilds_client(
|
|
self, agent, monkeypatch
|
|
):
|
|
agent.provider = "nous"
|
|
agent.api_mode = "chat_completions"
|
|
|
|
closed = {"value": False}
|
|
rebuilt = {"kwargs": None}
|
|
captured = {}
|
|
|
|
class _ExistingClient:
|
|
def close(self):
|
|
closed["value"] = True
|
|
|
|
class _RebuiltClient:
|
|
pass
|
|
|
|
def _fake_resolve(**kwargs):
|
|
captured.update(kwargs)
|
|
return {
|
|
"api_key": "new-nous-key",
|
|
"base_url": "https://inference-api.nousresearch.com/v1",
|
|
}
|
|
|
|
def _fake_openai(**kwargs):
|
|
rebuilt["kwargs"] = kwargs
|
|
return _RebuiltClient()
|
|
|
|
monkeypatch.setattr(
|
|
"hermes_cli.auth.resolve_nous_runtime_credentials", _fake_resolve
|
|
)
|
|
|
|
agent.client = _ExistingClient()
|
|
with patch("run_agent.OpenAI", side_effect=_fake_openai):
|
|
ok = agent._try_refresh_nous_client_credentials(force=True)
|
|
|
|
assert ok is True
|
|
assert closed["value"] is True
|
|
assert captured["force_mint"] is True
|
|
assert rebuilt["kwargs"]["api_key"] == "new-nous-key"
|
|
assert (
|
|
rebuilt["kwargs"]["base_url"] == "https://inference-api.nousresearch.com/v1"
|
|
)
|
|
assert "default_headers" not in rebuilt["kwargs"]
|
|
assert isinstance(agent.client, _RebuiltClient)
|
|
|
|
|
|
class TestMaxTokensParam:
|
|
"""Verify _max_tokens_param returns the correct key for each provider."""
|
|
|
|
def test_returns_max_completion_tokens_for_direct_openai(self, agent):
|
|
agent.base_url = "https://api.openai.com/v1"
|
|
result = agent._max_tokens_param(4096)
|
|
assert result == {"max_completion_tokens": 4096}
|
|
|
|
def test_returns_max_tokens_for_openrouter(self, agent):
|
|
agent.base_url = "https://openrouter.ai/api/v1"
|
|
result = agent._max_tokens_param(4096)
|
|
assert result == {"max_tokens": 4096}
|
|
|
|
def test_returns_max_tokens_for_local(self, agent):
|
|
agent.base_url = "http://localhost:11434/v1"
|
|
result = agent._max_tokens_param(4096)
|
|
assert result == {"max_tokens": 4096}
|
|
|
|
def test_not_tricked_by_openai_in_openrouter_url(self, agent):
|
|
agent.base_url = "https://openrouter.ai/api/v1/api.openai.com"
|
|
result = agent._max_tokens_param(4096)
|
|
assert result == {"max_tokens": 4096}
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# System prompt stability for prompt caching
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSystemPromptStability:
|
|
"""Verify that the system prompt stays stable across turns for cache hits."""
|
|
|
|
def test_stored_prompt_reused_for_continuing_session(self, agent):
|
|
"""When conversation_history is non-empty and session DB has a stored
|
|
prompt, it should be reused instead of rebuilding from disk."""
|
|
stored = "You are helpful. [stored from turn 1]"
|
|
mock_db = MagicMock()
|
|
mock_db.get_session.return_value = {"system_prompt": stored}
|
|
agent._session_db = mock_db
|
|
|
|
# Simulate a continuing session with history
|
|
history = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi"},
|
|
]
|
|
|
|
# First call — _cached_system_prompt is None, history is non-empty
|
|
agent._cached_system_prompt = None
|
|
|
|
# Patch run_conversation internals to just test the system prompt logic.
|
|
# We'll call the prompt caching block directly by simulating what
|
|
# run_conversation does.
|
|
conversation_history = history
|
|
|
|
# The block under test (from run_conversation):
|
|
if agent._cached_system_prompt is None:
|
|
stored_prompt = None
|
|
if conversation_history and agent._session_db:
|
|
try:
|
|
session_row = agent._session_db.get_session(agent.session_id)
|
|
if session_row:
|
|
stored_prompt = session_row.get("system_prompt") or None
|
|
except Exception:
|
|
pass
|
|
|
|
if stored_prompt:
|
|
agent._cached_system_prompt = stored_prompt
|
|
|
|
assert agent._cached_system_prompt == stored
|
|
mock_db.get_session.assert_called_once_with(agent.session_id)
|
|
|
|
def test_fresh_build_when_no_history(self, agent):
|
|
"""On the first turn (no history), system prompt should be built fresh."""
|
|
mock_db = MagicMock()
|
|
agent._session_db = mock_db
|
|
|
|
agent._cached_system_prompt = None
|
|
conversation_history = []
|
|
|
|
# The block under test:
|
|
if agent._cached_system_prompt is None:
|
|
stored_prompt = None
|
|
if conversation_history and agent._session_db:
|
|
session_row = agent._session_db.get_session(agent.session_id)
|
|
if session_row:
|
|
stored_prompt = session_row.get("system_prompt") or None
|
|
|
|
if stored_prompt:
|
|
agent._cached_system_prompt = stored_prompt
|
|
else:
|
|
agent._cached_system_prompt = agent._build_system_prompt()
|
|
|
|
# Should have built fresh, not queried the DB
|
|
mock_db.get_session.assert_not_called()
|
|
assert agent._cached_system_prompt is not None
|
|
assert "Hermes Agent" in agent._cached_system_prompt
|
|
|
|
def test_fresh_build_when_db_has_no_prompt(self, agent):
|
|
"""If the session DB has no stored prompt, build fresh even with history."""
|
|
mock_db = MagicMock()
|
|
mock_db.get_session.return_value = {"system_prompt": ""}
|
|
agent._session_db = mock_db
|
|
|
|
agent._cached_system_prompt = None
|
|
conversation_history = [{"role": "user", "content": "hi"}]
|
|
|
|
if agent._cached_system_prompt is None:
|
|
stored_prompt = None
|
|
if conversation_history and agent._session_db:
|
|
try:
|
|
session_row = agent._session_db.get_session(agent.session_id)
|
|
if session_row:
|
|
stored_prompt = session_row.get("system_prompt") or None
|
|
except Exception:
|
|
pass
|
|
|
|
if stored_prompt:
|
|
agent._cached_system_prompt = stored_prompt
|
|
else:
|
|
agent._cached_system_prompt = agent._build_system_prompt()
|
|
|
|
# Empty string is falsy, so should fall through to fresh build
|
|
assert "Hermes Agent" in agent._cached_system_prompt
|
|
|
|
def test_honcho_context_baked_into_prompt_on_first_turn(self, agent):
|
|
"""Honcho context should be baked into _cached_system_prompt on
|
|
the first turn, not injected separately per API call."""
|
|
agent._honcho_context = "User prefers Python over JavaScript."
|
|
agent._cached_system_prompt = None
|
|
|
|
# Simulate first turn: build fresh and bake in Honcho
|
|
agent._cached_system_prompt = agent._build_system_prompt()
|
|
if agent._honcho_context:
|
|
agent._cached_system_prompt = (
|
|
agent._cached_system_prompt + "\n\n" + agent._honcho_context
|
|
).strip()
|
|
|
|
assert "User prefers Python over JavaScript" in agent._cached_system_prompt
|
|
|
|
def test_honcho_prefetch_runs_on_continuing_session(self):
|
|
"""Honcho prefetch is consumed on continuing sessions via ephemeral context."""
|
|
conversation_history = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi there"},
|
|
]
|
|
recall_mode = "hybrid"
|
|
should_prefetch = bool(conversation_history) and recall_mode != "tools"
|
|
assert should_prefetch is True
|
|
|
|
def test_inject_honcho_turn_context_appends_system_note(self):
|
|
content = _inject_honcho_turn_context("hello", "## Honcho Memory\nprior context")
|
|
assert "hello" in content
|
|
assert "Honcho memory was retrieved from prior sessions" in content
|
|
assert "## Honcho Memory" in content
|
|
|
|
def test_honcho_continuing_session_keeps_turn_context_out_of_system_prompt(self, agent):
|
|
captured = {}
|
|
|
|
def _fake_api_call(api_kwargs):
|
|
captured.update(api_kwargs)
|
|
return _mock_response(content="done", finish_reason="stop")
|
|
|
|
agent._honcho = object()
|
|
agent._honcho_session_key = "session-1"
|
|
agent._honcho_config = SimpleNamespace(
|
|
ai_peer="hermes",
|
|
memory_mode="hybrid",
|
|
write_frequency="async",
|
|
recall_mode="hybrid",
|
|
)
|
|
agent._use_prompt_caching = False
|
|
conversation_history = [
|
|
{"role": "user", "content": "hello"},
|
|
{"role": "assistant", "content": "hi there"},
|
|
]
|
|
|
|
with (
|
|
patch.object(agent, "_honcho_prefetch", return_value="## Honcho Memory\nprior context"),
|
|
patch.object(agent, "_queue_honcho_prefetch"),
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
|
|
):
|
|
result = agent.run_conversation("what were we doing?", conversation_history=conversation_history)
|
|
|
|
assert result["completed"] is True
|
|
api_messages = captured["messages"]
|
|
assert api_messages[0]["role"] == "system"
|
|
assert "prior context" not in api_messages[0]["content"]
|
|
current_user = api_messages[-1]
|
|
assert current_user["role"] == "user"
|
|
assert "what were we doing?" in current_user["content"]
|
|
assert "prior context" in current_user["content"]
|
|
assert "Honcho memory was retrieved from prior sessions" in current_user["content"]
|
|
|
|
def test_honcho_prefetch_runs_on_first_turn(self):
|
|
"""Honcho prefetch should run when conversation_history is empty."""
|
|
conversation_history = []
|
|
should_prefetch = not conversation_history
|
|
assert should_prefetch is True
|
|
|
|
def test_run_conversation_can_skip_honcho_sync_for_synthetic_turns(self, agent):
|
|
captured = {}
|
|
|
|
def _fake_api_call(api_kwargs):
|
|
captured.update(api_kwargs)
|
|
return _mock_response(content="done", finish_reason="stop")
|
|
|
|
agent._honcho = MagicMock()
|
|
agent._honcho_session_key = "session-1"
|
|
agent._honcho_config = SimpleNamespace(
|
|
ai_peer="hermes",
|
|
memory_mode="hybrid",
|
|
write_frequency="async",
|
|
recall_mode="hybrid",
|
|
)
|
|
agent._use_prompt_caching = False
|
|
|
|
with (
|
|
patch.object(agent, "_honcho_sync") as mock_sync,
|
|
patch.object(agent, "_queue_honcho_prefetch") as mock_prefetch,
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
|
|
):
|
|
result = agent.run_conversation("synthetic flush turn", sync_honcho=False)
|
|
|
|
assert result["completed"] is True
|
|
assert captured["messages"][-1]["content"] == "synthetic flush turn"
|
|
mock_sync.assert_not_called()
|
|
mock_prefetch.assert_not_called()
|
|
|
|
|
|
class TestHonchoActivation:
|
|
def test_disabled_config_skips_honcho_init(self):
|
|
hcfg = HonchoClientConfig(
|
|
enabled=False,
|
|
api_key="honcho-key",
|
|
peer_name="user",
|
|
ai_peer="hermes",
|
|
)
|
|
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
|
|
patch("honcho_integration.client.get_honcho_client") as mock_client,
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=False,
|
|
)
|
|
|
|
assert agent._honcho is None
|
|
assert agent._honcho_config is hcfg
|
|
mock_client.assert_not_called()
|
|
|
|
def test_injected_honcho_manager_skips_fresh_client_init(self):
|
|
hcfg = HonchoClientConfig(
|
|
enabled=True,
|
|
api_key="honcho-key",
|
|
memory_mode="hybrid",
|
|
peer_name="user",
|
|
ai_peer="hermes",
|
|
recall_mode="hybrid",
|
|
)
|
|
manager = MagicMock()
|
|
manager._config = hcfg
|
|
manager.get_or_create.return_value = SimpleNamespace(messages=[])
|
|
manager.get_prefetch_context.return_value = {"representation": "Known user", "card": ""}
|
|
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
patch("honcho_integration.client.get_honcho_client") as mock_client,
|
|
patch("tools.honcho_tools.set_session_context"),
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=False,
|
|
honcho_session_key="gateway-session",
|
|
honcho_manager=manager,
|
|
honcho_config=hcfg,
|
|
)
|
|
|
|
assert agent._honcho is manager
|
|
manager.get_or_create.assert_called_once_with("gateway-session")
|
|
manager.get_prefetch_context.assert_called_once_with("gateway-session")
|
|
manager.set_context_result.assert_called_once_with(
|
|
"gateway-session",
|
|
{"representation": "Known user", "card": ""},
|
|
)
|
|
mock_client.assert_not_called()
|
|
|
|
def test_recall_mode_context_suppresses_honcho_tools(self):
|
|
hcfg = HonchoClientConfig(
|
|
enabled=True,
|
|
api_key="honcho-key",
|
|
memory_mode="hybrid",
|
|
peer_name="user",
|
|
ai_peer="hermes",
|
|
recall_mode="context",
|
|
)
|
|
manager = MagicMock()
|
|
manager._config = hcfg
|
|
manager.get_or_create.return_value = SimpleNamespace(messages=[])
|
|
manager.get_prefetch_context.return_value = {"representation": "Known user", "card": ""}
|
|
|
|
with (
|
|
patch(
|
|
"run_agent.get_tool_definitions",
|
|
side_effect=[
|
|
_make_tool_defs("web_search"),
|
|
_make_tool_defs(
|
|
"web_search",
|
|
"honcho_context",
|
|
"honcho_profile",
|
|
"honcho_search",
|
|
"honcho_conclude",
|
|
),
|
|
],
|
|
),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
patch("tools.honcho_tools.set_session_context"),
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=False,
|
|
honcho_session_key="gateway-session",
|
|
honcho_manager=manager,
|
|
honcho_config=hcfg,
|
|
)
|
|
|
|
assert "web_search" in agent.valid_tool_names
|
|
assert "honcho_context" not in agent.valid_tool_names
|
|
assert "honcho_profile" not in agent.valid_tool_names
|
|
assert "honcho_search" not in agent.valid_tool_names
|
|
assert "honcho_conclude" not in agent.valid_tool_names
|
|
|
|
def test_inactive_honcho_strips_stale_honcho_tools(self):
|
|
hcfg = HonchoClientConfig(
|
|
enabled=False,
|
|
api_key="honcho-key",
|
|
peer_name="user",
|
|
ai_peer="hermes",
|
|
)
|
|
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search", "honcho_context")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
|
|
patch("honcho_integration.client.get_honcho_client") as mock_client,
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-key-1234567890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=False,
|
|
)
|
|
|
|
assert agent._honcho is None
|
|
assert "web_search" in agent.valid_tool_names
|
|
assert "honcho_context" not in agent.valid_tool_names
|
|
mock_client.assert_not_called()
|
|
|
|
|
|
class TestHonchoPrefetchScheduling:
|
|
def test_honcho_prefetch_includes_cached_dialectic(self, agent):
|
|
agent._honcho = MagicMock()
|
|
agent._honcho_session_key = "session-key"
|
|
agent._honcho.pop_context_result.return_value = {}
|
|
agent._honcho.pop_dialectic_result.return_value = "Continue with the migration checklist."
|
|
|
|
context = agent._honcho_prefetch("what next?")
|
|
|
|
assert "Continuity synthesis" in context
|
|
assert "migration checklist" in context
|
|
|
|
def test_queue_honcho_prefetch_skips_tools_mode(self, agent):
|
|
agent._honcho = MagicMock()
|
|
agent._honcho_session_key = "session-key"
|
|
agent._honcho_config = HonchoClientConfig(
|
|
enabled=True,
|
|
api_key="honcho-key",
|
|
recall_mode="tools",
|
|
)
|
|
|
|
agent._queue_honcho_prefetch("what next?")
|
|
|
|
agent._honcho.prefetch_context.assert_not_called()
|
|
agent._honcho.prefetch_dialectic.assert_not_called()
|
|
|
|
def test_queue_honcho_prefetch_runs_when_context_enabled(self, agent):
|
|
agent._honcho = MagicMock()
|
|
agent._honcho_session_key = "session-key"
|
|
agent._honcho_config = HonchoClientConfig(
|
|
enabled=True,
|
|
api_key="honcho-key",
|
|
recall_mode="hybrid",
|
|
)
|
|
|
|
agent._queue_honcho_prefetch("what next?")
|
|
|
|
agent._honcho.prefetch_context.assert_called_once_with("session-key", "what next?")
|
|
agent._honcho.prefetch_dialectic.assert_called_once_with("session-key", "what next?")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Iteration budget pressure warnings
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestBudgetPressure:
|
|
"""Budget pressure warning system (issue #414)."""
|
|
|
|
def test_no_warning_below_caution(self, agent):
|
|
agent.max_iterations = 60
|
|
assert agent._get_budget_warning(30) is None
|
|
|
|
def test_caution_at_70_percent(self, agent):
|
|
agent.max_iterations = 60
|
|
msg = agent._get_budget_warning(42)
|
|
assert msg is not None
|
|
assert "[BUDGET:" in msg
|
|
assert "18 iterations left" in msg
|
|
|
|
def test_warning_at_90_percent(self, agent):
|
|
agent.max_iterations = 60
|
|
msg = agent._get_budget_warning(54)
|
|
assert "[BUDGET WARNING:" in msg
|
|
assert "Provide your final response NOW" in msg
|
|
|
|
def test_last_iteration(self, agent):
|
|
agent.max_iterations = 60
|
|
msg = agent._get_budget_warning(59)
|
|
assert "1 iteration(s) left" in msg
|
|
|
|
def test_disabled(self, agent):
|
|
agent.max_iterations = 60
|
|
agent._budget_pressure_enabled = False
|
|
assert agent._get_budget_warning(55) is None
|
|
|
|
def test_zero_max_iterations(self, agent):
|
|
agent.max_iterations = 0
|
|
assert agent._get_budget_warning(0) is None
|
|
|
|
def test_injects_into_json_tool_result(self, agent):
|
|
"""Warning should be injected as _budget_warning field in JSON tool results."""
|
|
import json
|
|
agent.max_iterations = 10
|
|
messages = [
|
|
{"role": "tool", "content": json.dumps({"output": "done", "exit_code": 0}), "tool_call_id": "tc1"}
|
|
]
|
|
warning = agent._get_budget_warning(9)
|
|
assert warning is not None
|
|
# Simulate the injection logic
|
|
last_content = messages[-1]["content"]
|
|
parsed = json.loads(last_content)
|
|
parsed["_budget_warning"] = warning
|
|
messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False)
|
|
result = json.loads(messages[-1]["content"])
|
|
assert "_budget_warning" in result
|
|
assert "BUDGET WARNING" in result["_budget_warning"]
|
|
assert result["output"] == "done" # original content preserved
|
|
|
|
def test_appends_to_non_json_tool_result(self, agent):
|
|
"""Warning should be appended as text for non-JSON tool results."""
|
|
agent.max_iterations = 10
|
|
messages = [
|
|
{"role": "tool", "content": "plain text result", "tool_call_id": "tc1"}
|
|
]
|
|
warning = agent._get_budget_warning(9)
|
|
# Simulate injection logic for non-JSON
|
|
last_content = messages[-1]["content"]
|
|
try:
|
|
import json
|
|
json.loads(last_content)
|
|
except (json.JSONDecodeError, TypeError):
|
|
messages[-1]["content"] = last_content + f"\n\n{warning}"
|
|
assert "plain text result" in messages[-1]["content"]
|
|
assert "BUDGET WARNING" in messages[-1]["content"]
|
|
|
|
|
|
class TestSafeWriter:
|
|
"""Verify _SafeWriter guards stdout against OSError (broken pipes)."""
|
|
|
|
def test_write_delegates_normally(self):
|
|
"""When stdout is healthy, _SafeWriter is transparent."""
|
|
from run_agent import _SafeWriter
|
|
from io import StringIO
|
|
inner = StringIO()
|
|
writer = _SafeWriter(inner)
|
|
writer.write("hello")
|
|
assert inner.getvalue() == "hello"
|
|
|
|
def test_write_catches_oserror(self):
|
|
"""OSError on write is silently caught, returns len(data)."""
|
|
from run_agent import _SafeWriter
|
|
from unittest.mock import MagicMock
|
|
inner = MagicMock()
|
|
inner.write.side_effect = OSError(5, "Input/output error")
|
|
writer = _SafeWriter(inner)
|
|
result = writer.write("hello")
|
|
assert result == 5 # len("hello")
|
|
|
|
def test_flush_catches_oserror(self):
|
|
"""OSError on flush is silently caught."""
|
|
from run_agent import _SafeWriter
|
|
from unittest.mock import MagicMock
|
|
inner = MagicMock()
|
|
inner.flush.side_effect = OSError(5, "Input/output error")
|
|
writer = _SafeWriter(inner)
|
|
writer.flush() # should not raise
|
|
|
|
def test_print_survives_broken_stdout(self, monkeypatch):
|
|
"""print() through _SafeWriter doesn't crash on broken pipe."""
|
|
import sys
|
|
from run_agent import _SafeWriter
|
|
from unittest.mock import MagicMock
|
|
broken = MagicMock()
|
|
broken.write.side_effect = OSError(5, "Input/output error")
|
|
original = sys.stdout
|
|
sys.stdout = _SafeWriter(broken)
|
|
try:
|
|
print("this should not crash") # would raise without _SafeWriter
|
|
finally:
|
|
sys.stdout = original
|
|
|
|
def test_installed_in_run_conversation(self, agent):
|
|
"""run_conversation installs _SafeWriter on stdio."""
|
|
import sys
|
|
from run_agent import _SafeWriter
|
|
resp = _mock_response(content="Done", finish_reason="stop")
|
|
agent.client.chat.completions.create.return_value = resp
|
|
original_stdout = sys.stdout
|
|
original_stderr = sys.stderr
|
|
try:
|
|
with (
|
|
patch.object(agent, "_persist_session"),
|
|
patch.object(agent, "_save_trajectory"),
|
|
patch.object(agent, "_cleanup_task_resources"),
|
|
):
|
|
agent.run_conversation("test")
|
|
assert isinstance(sys.stdout, _SafeWriter)
|
|
assert isinstance(sys.stderr, _SafeWriter)
|
|
finally:
|
|
sys.stdout = original_stdout
|
|
sys.stderr = original_stderr
|
|
|
|
def test_installed_before_init_time_honcho_error_prints(self):
|
|
"""AIAgent.__init__ wraps stdout before Honcho fallback prints can fire."""
|
|
import sys
|
|
from run_agent import _SafeWriter
|
|
|
|
broken = MagicMock()
|
|
broken.write.side_effect = OSError(5, "Input/output error")
|
|
broken.flush.side_effect = OSError(5, "Input/output error")
|
|
|
|
original = sys.stdout
|
|
sys.stdout = broken
|
|
try:
|
|
hcfg = HonchoClientConfig(enabled=True, api_key="test-honcho-key")
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI"),
|
|
patch("hermes_cli.config.load_config", return_value={"memory": {}}),
|
|
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
|
|
patch("honcho_integration.client.get_honcho_client", side_effect=RuntimeError("boom")),
|
|
):
|
|
agent = AIAgent(
|
|
api_key="test-k...7890",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=False,
|
|
)
|
|
|
|
assert isinstance(sys.stdout, _SafeWriter)
|
|
assert agent._honcho is None
|
|
finally:
|
|
sys.stdout = original
|
|
|
|
def test_double_wrap_prevented(self):
|
|
"""Wrapping an already-wrapped stream doesn't add layers."""
|
|
import sys
|
|
from run_agent import _SafeWriter
|
|
from io import StringIO
|
|
inner = StringIO()
|
|
wrapped = _SafeWriter(inner)
|
|
# isinstance check should prevent double-wrapping
|
|
assert isinstance(wrapped, _SafeWriter)
|
|
# The guard in run_conversation checks isinstance before wrapping
|
|
if not isinstance(wrapped, _SafeWriter):
|
|
wrapped = _SafeWriter(wrapped)
|
|
# Still just one layer
|
|
wrapped.write("test")
|
|
assert inner.getvalue() == "test"
|
|
|
|
|
|
class TestSaveSessionLogAtomicWrite:
|
|
def test_uses_shared_atomic_json_helper(self, agent, tmp_path):
|
|
agent.session_log_file = tmp_path / "session.json"
|
|
messages = [{"role": "user", "content": "hello"}]
|
|
|
|
with patch("run_agent.atomic_json_write", create=True) as mock_atomic_write:
|
|
agent._save_session_log(messages)
|
|
|
|
mock_atomic_write.assert_called_once()
|
|
call_args = mock_atomic_write.call_args
|
|
assert call_args.args[0] == agent.session_log_file
|
|
payload = call_args.args[1]
|
|
assert payload["session_id"] == agent.session_id
|
|
assert payload["messages"] == messages
|
|
assert call_args.kwargs["indent"] == 2
|
|
assert call_args.kwargs["default"] is str
|
|
|
|
|
|
# ===================================================================
|
|
# Anthropic adapter integration fixes
|
|
# ===================================================================
|
|
|
|
|
|
class TestBuildApiKwargsAnthropicMaxTokens:
|
|
"""Bug fix: max_tokens was always None for Anthropic mode, ignoring user config."""
|
|
|
|
def test_max_tokens_passed_to_anthropic(self, agent):
|
|
agent.api_mode = "anthropic_messages"
|
|
agent.max_tokens = 4096
|
|
agent.reasoning_config = None
|
|
|
|
with patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build:
|
|
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 4096}
|
|
agent._build_api_kwargs([{"role": "user", "content": "test"}])
|
|
_, kwargs = mock_build.call_args
|
|
if not kwargs:
|
|
kwargs = dict(zip(
|
|
["model", "messages", "tools", "max_tokens", "reasoning_config"],
|
|
mock_build.call_args[0],
|
|
))
|
|
assert kwargs.get("max_tokens") == 4096 or mock_build.call_args[1].get("max_tokens") == 4096
|
|
|
|
def test_max_tokens_none_when_unset(self, agent):
|
|
agent.api_mode = "anthropic_messages"
|
|
agent.max_tokens = None
|
|
agent.reasoning_config = None
|
|
|
|
with patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build:
|
|
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 16384}
|
|
agent._build_api_kwargs([{"role": "user", "content": "test"}])
|
|
call_args = mock_build.call_args
|
|
# max_tokens should be None (let adapter use its default)
|
|
if call_args[1]:
|
|
assert call_args[1].get("max_tokens") is None
|
|
else:
|
|
assert call_args[0][3] is None
|
|
|
|
|
|
class TestAnthropicImageFallback:
|
|
def test_build_api_kwargs_converts_multimodal_user_image_to_text(self, agent):
|
|
agent.api_mode = "anthropic_messages"
|
|
agent.reasoning_config = None
|
|
|
|
api_messages = [{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "Can you see this now?"},
|
|
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png"}},
|
|
],
|
|
}]
|
|
|
|
with (
|
|
patch("tools.vision_tools.vision_analyze_tool", new=AsyncMock(return_value=json.dumps({"success": True, "analysis": "A cat sitting on a chair."}))),
|
|
patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build,
|
|
):
|
|
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 4096}
|
|
agent._build_api_kwargs(api_messages)
|
|
|
|
kwargs = mock_build.call_args.kwargs or dict(zip(
|
|
["model", "messages", "tools", "max_tokens", "reasoning_config"],
|
|
mock_build.call_args.args,
|
|
))
|
|
transformed = kwargs["messages"]
|
|
assert isinstance(transformed[0]["content"], str)
|
|
assert "A cat sitting on a chair." in transformed[0]["content"]
|
|
assert "Can you see this now?" in transformed[0]["content"]
|
|
assert "vision_analyze with image_url: https://example.com/cat.png" in transformed[0]["content"]
|
|
|
|
def test_build_api_kwargs_reuses_cached_image_analysis_for_duplicate_images(self, agent):
|
|
agent.api_mode = "anthropic_messages"
|
|
agent.reasoning_config = None
|
|
data_url = "data:image/png;base64,QUFBQQ=="
|
|
|
|
api_messages = [
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "first"},
|
|
{"type": "input_image", "image_url": data_url},
|
|
],
|
|
},
|
|
{
|
|
"role": "user",
|
|
"content": [
|
|
{"type": "text", "text": "second"},
|
|
{"type": "input_image", "image_url": data_url},
|
|
],
|
|
},
|
|
]
|
|
|
|
mock_vision = AsyncMock(return_value=json.dumps({"success": True, "analysis": "A small test image."}))
|
|
with (
|
|
patch("tools.vision_tools.vision_analyze_tool", new=mock_vision),
|
|
patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build,
|
|
):
|
|
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 4096}
|
|
agent._build_api_kwargs(api_messages)
|
|
|
|
assert mock_vision.await_count == 1
|
|
|
|
|
|
class TestFallbackAnthropicProvider:
|
|
"""Bug fix: _try_activate_fallback had no case for anthropic provider."""
|
|
|
|
def test_fallback_to_anthropic_sets_api_mode(self, agent):
|
|
agent._fallback_activated = False
|
|
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-20250514"}
|
|
|
|
mock_client = MagicMock()
|
|
mock_client.base_url = "https://api.anthropic.com/v1"
|
|
mock_client.api_key = "sk-ant-api03-test"
|
|
|
|
with (
|
|
patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_client, None)),
|
|
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value=None),
|
|
):
|
|
mock_build.return_value = MagicMock()
|
|
result = agent._try_activate_fallback()
|
|
|
|
assert result is True
|
|
assert agent.api_mode == "anthropic_messages"
|
|
assert agent._anthropic_client is not None
|
|
assert agent.client is None
|
|
|
|
def test_fallback_to_anthropic_enables_prompt_caching(self, agent):
|
|
agent._fallback_activated = False
|
|
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-20250514"}
|
|
|
|
mock_client = MagicMock()
|
|
mock_client.base_url = "https://api.anthropic.com/v1"
|
|
mock_client.api_key = "sk-ant-api03-test"
|
|
|
|
with (
|
|
patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_client, None)),
|
|
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value=None),
|
|
):
|
|
agent._try_activate_fallback()
|
|
|
|
assert agent._use_prompt_caching is True
|
|
|
|
def test_fallback_to_openrouter_uses_openai_client(self, agent):
|
|
agent._fallback_activated = False
|
|
agent._fallback_model = {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}
|
|
|
|
mock_client = MagicMock()
|
|
mock_client.base_url = "https://openrouter.ai/api/v1"
|
|
mock_client.api_key = "sk-or-test"
|
|
|
|
with patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_client, None)):
|
|
result = agent._try_activate_fallback()
|
|
|
|
assert result is True
|
|
assert agent.api_mode == "chat_completions"
|
|
assert agent.client is mock_client
|
|
|
|
|
|
def test_aiagent_uses_copilot_acp_client():
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("run_agent.OpenAI") as mock_openai,
|
|
patch("agent.copilot_acp_client.CopilotACPClient") as mock_acp_client,
|
|
):
|
|
acp_client = MagicMock()
|
|
mock_acp_client.return_value = acp_client
|
|
|
|
agent = AIAgent(
|
|
api_key="copilot-acp",
|
|
base_url="acp://copilot",
|
|
provider="copilot-acp",
|
|
acp_command="/usr/local/bin/copilot",
|
|
acp_args=["--acp", "--stdio"],
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
|
|
assert agent.client is acp_client
|
|
mock_openai.assert_not_called()
|
|
mock_acp_client.assert_called_once()
|
|
assert mock_acp_client.call_args.kwargs["base_url"] == "acp://copilot"
|
|
assert mock_acp_client.call_args.kwargs["api_key"] == "copilot-acp"
|
|
assert mock_acp_client.call_args.kwargs["command"] == "/usr/local/bin/copilot"
|
|
assert mock_acp_client.call_args.kwargs["args"] == ["--acp", "--stdio"]
|
|
|
|
|
|
def test_is_openai_client_closed_honors_custom_client_flag():
|
|
assert AIAgent._is_openai_client_closed(SimpleNamespace(is_closed=True)) is True
|
|
assert AIAgent._is_openai_client_closed(SimpleNamespace(is_closed=False)) is False
|
|
|
|
|
|
class TestAnthropicBaseUrlPassthrough:
|
|
"""Bug fix: base_url was filtered with 'anthropic in base_url', blocking proxies."""
|
|
|
|
def test_custom_proxy_base_url_passed_through(self):
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
|
|
):
|
|
mock_build.return_value = MagicMock()
|
|
a = AIAgent(
|
|
api_key="sk-ant-api03-test1234567890",
|
|
base_url="https://llm-proxy.company.com/v1",
|
|
api_mode="anthropic_messages",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
call_args = mock_build.call_args
|
|
# base_url should be passed through, not filtered out
|
|
assert call_args[0][1] == "https://llm-proxy.company.com/v1"
|
|
|
|
def test_none_base_url_passed_as_none(self):
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
|
|
):
|
|
mock_build.return_value = MagicMock()
|
|
a = AIAgent(
|
|
api_key="sk-ant-api03-test1234567890",
|
|
api_mode="anthropic_messages",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
call_args = mock_build.call_args
|
|
# No base_url provided, should be default empty string or None
|
|
passed_url = call_args[0][1]
|
|
assert not passed_url or passed_url is None
|
|
|
|
|
|
class TestAnthropicCredentialRefresh:
|
|
def test_try_refresh_anthropic_client_credentials_rebuilds_client(self):
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
|
|
):
|
|
old_client = MagicMock()
|
|
new_client = MagicMock()
|
|
mock_build.side_effect = [old_client, new_client]
|
|
agent = AIAgent(
|
|
api_key="sk-ant-oat01-stale-token",
|
|
api_mode="anthropic_messages",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
|
|
agent._anthropic_client = old_client
|
|
agent._anthropic_api_key = "sk-ant-oat01-stale-token"
|
|
agent._anthropic_base_url = "https://api.anthropic.com"
|
|
agent.provider = "anthropic"
|
|
|
|
with (
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="sk-ant-oat01-fresh-token"),
|
|
patch("agent.anthropic_adapter.build_anthropic_client", return_value=new_client) as rebuild,
|
|
):
|
|
assert agent._try_refresh_anthropic_client_credentials() is True
|
|
|
|
old_client.close.assert_called_once()
|
|
rebuild.assert_called_once_with("sk-ant-oat01-fresh-token", "https://api.anthropic.com")
|
|
assert agent._anthropic_client is new_client
|
|
assert agent._anthropic_api_key == "sk-ant-oat01-fresh-token"
|
|
|
|
def test_try_refresh_anthropic_client_credentials_returns_false_when_token_unchanged(self):
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
|
|
):
|
|
agent = AIAgent(
|
|
api_key="sk-ant-oat01-same-token",
|
|
api_mode="anthropic_messages",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
|
|
old_client = MagicMock()
|
|
agent._anthropic_client = old_client
|
|
agent._anthropic_api_key = "sk-ant-oat01-same-token"
|
|
|
|
with (
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="sk-ant-oat01-same-token"),
|
|
patch("agent.anthropic_adapter.build_anthropic_client") as rebuild,
|
|
):
|
|
assert agent._try_refresh_anthropic_client_credentials() is False
|
|
|
|
old_client.close.assert_not_called()
|
|
rebuild.assert_not_called()
|
|
|
|
def test_anthropic_messages_create_preflights_refresh(self):
|
|
with (
|
|
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
|
|
patch("run_agent.check_toolset_requirements", return_value={}),
|
|
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
|
|
):
|
|
agent = AIAgent(
|
|
api_key="sk-ant-oat01-current-token",
|
|
api_mode="anthropic_messages",
|
|
quiet_mode=True,
|
|
skip_context_files=True,
|
|
skip_memory=True,
|
|
)
|
|
|
|
response = SimpleNamespace(content=[])
|
|
agent._anthropic_client = MagicMock()
|
|
agent._anthropic_client.messages.create.return_value = response
|
|
|
|
with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=True) as refresh:
|
|
result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
|
|
|
|
refresh.assert_called_once_with()
|
|
agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514")
|
|
assert result is response
|
|
|
|
|
|
# ===================================================================
|
|
# _streaming_api_call tests
|
|
# ===================================================================
|
|
|
|
def _make_chunk(content=None, tool_calls=None, finish_reason=None, model="test/model"):
|
|
"""Build a SimpleNamespace mimicking an OpenAI streaming chunk."""
|
|
delta = SimpleNamespace(content=content, tool_calls=tool_calls)
|
|
choice = SimpleNamespace(delta=delta, finish_reason=finish_reason)
|
|
return SimpleNamespace(model=model, choices=[choice])
|
|
|
|
|
|
def _make_tc_delta(index=0, tc_id=None, name=None, arguments=None):
|
|
"""Build a SimpleNamespace mimicking a streaming tool_call delta."""
|
|
func = SimpleNamespace(name=name, arguments=arguments)
|
|
return SimpleNamespace(index=index, id=tc_id, function=func)
|
|
|
|
|
|
class TestStreamingApiCall:
|
|
"""Tests for _streaming_api_call — voice TTS streaming pipeline."""
|
|
|
|
def test_content_assembly(self, agent):
|
|
chunks = [
|
|
_make_chunk(content="Hel"),
|
|
_make_chunk(content="lo "),
|
|
_make_chunk(content="World"),
|
|
_make_chunk(finish_reason="stop"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
callback = MagicMock()
|
|
agent.stream_delta_callback = callback
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.choices[0].message.content == "Hello World"
|
|
assert resp.choices[0].finish_reason == "stop"
|
|
assert callback.call_count == 3
|
|
callback.assert_any_call("Hel")
|
|
callback.assert_any_call("lo ")
|
|
callback.assert_any_call("World")
|
|
|
|
def test_tool_call_accumulation(self, agent):
|
|
chunks = [
|
|
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "web_", '{"q":')]),
|
|
_make_chunk(tool_calls=[_make_tc_delta(0, None, "search", '"test"}')]),
|
|
_make_chunk(finish_reason="tool_calls"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
tc = resp.choices[0].message.tool_calls
|
|
assert len(tc) == 1
|
|
assert tc[0].function.name == "web_search"
|
|
assert tc[0].function.arguments == '{"q":"test"}'
|
|
assert tc[0].id == "call_1"
|
|
|
|
def test_multiple_tool_calls(self, agent):
|
|
chunks = [
|
|
_make_chunk(tool_calls=[_make_tc_delta(0, "call_a", "search", '{}')]),
|
|
_make_chunk(tool_calls=[_make_tc_delta(1, "call_b", "read", '{}')]),
|
|
_make_chunk(finish_reason="tool_calls"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
tc = resp.choices[0].message.tool_calls
|
|
assert len(tc) == 2
|
|
assert tc[0].function.name == "search"
|
|
assert tc[1].function.name == "read"
|
|
|
|
def test_content_and_tool_calls_together(self, agent):
|
|
chunks = [
|
|
_make_chunk(content="I'll search"),
|
|
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "search", '{}')]),
|
|
_make_chunk(finish_reason="tool_calls"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.choices[0].message.content == "I'll search"
|
|
assert len(resp.choices[0].message.tool_calls) == 1
|
|
|
|
def test_empty_content_returns_none(self, agent):
|
|
chunks = [_make_chunk(finish_reason="stop")]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.choices[0].message.content is None
|
|
assert resp.choices[0].message.tool_calls is None
|
|
|
|
def test_callback_exception_swallowed(self, agent):
|
|
chunks = [
|
|
_make_chunk(content="Hello"),
|
|
_make_chunk(content=" World"),
|
|
_make_chunk(finish_reason="stop"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
agent.stream_delta_callback = MagicMock(side_effect=ValueError("boom"))
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.choices[0].message.content == "Hello World"
|
|
|
|
def test_model_name_captured(self, agent):
|
|
chunks = [
|
|
_make_chunk(content="Hi", model="gpt-4o"),
|
|
_make_chunk(finish_reason="stop", model="gpt-4o"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.model == "gpt-4o"
|
|
|
|
def test_stream_kwarg_injected(self, agent):
|
|
chunks = [_make_chunk(content="x"), _make_chunk(finish_reason="stop")]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
agent._interruptible_streaming_api_call({"messages": [], "model": "test"})
|
|
|
|
call_kwargs = agent.client.chat.completions.create.call_args
|
|
assert call_kwargs[1].get("stream") is True or call_kwargs.kwargs.get("stream") is True
|
|
|
|
def test_api_exception_falls_back_to_non_streaming(self, agent):
|
|
"""When streaming fails before any deltas, fallback to non-streaming is attempted."""
|
|
agent.client.chat.completions.create.side_effect = ConnectionError("fail")
|
|
# The fallback also uses the same client, so it'll fail too
|
|
with pytest.raises(ConnectionError, match="fail"):
|
|
agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
def test_response_has_uuid_id(self, agent):
|
|
chunks = [_make_chunk(content="x"), _make_chunk(finish_reason="stop")]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.id.startswith("stream-")
|
|
assert len(resp.id) > len("stream-")
|
|
|
|
def test_empty_choices_chunk_skipped(self, agent):
|
|
empty_chunk = SimpleNamespace(model="gpt-4", choices=[])
|
|
chunks = [
|
|
empty_chunk,
|
|
_make_chunk(content="Hello", model="gpt-4"),
|
|
_make_chunk(finish_reason="stop", model="gpt-4"),
|
|
]
|
|
agent.client.chat.completions.create.return_value = iter(chunks)
|
|
|
|
resp = agent._interruptible_streaming_api_call({"messages": []})
|
|
|
|
assert resp.choices[0].message.content == "Hello"
|
|
assert resp.model == "gpt-4"
|
|
|
|
|
|
# ===================================================================
|
|
# Interrupt _vprint force=True verification
|
|
# ===================================================================
|
|
|
|
|
|
class TestInterruptVprintForceTrue:
|
|
"""All interrupt _vprint calls must use force=True so they are always visible."""
|
|
|
|
def test_all_interrupt_vprint_have_force_true(self):
|
|
"""Scan source for _vprint calls containing 'Interrupt' — each must have force=True."""
|
|
import inspect
|
|
source = inspect.getsource(AIAgent)
|
|
lines = source.split("\n")
|
|
violations = []
|
|
for i, line in enumerate(lines, 1):
|
|
stripped = line.strip()
|
|
if "_vprint(" in stripped and "Interrupt" in stripped:
|
|
if "force=True" not in stripped:
|
|
violations.append(f"line {i}: {stripped}")
|
|
assert not violations, (
|
|
f"Interrupt _vprint calls missing force=True:\n"
|
|
+ "\n".join(violations)
|
|
)
|
|
|
|
|
|
# ===================================================================
|
|
# Anthropic interrupt handler in _interruptible_api_call
|
|
# ===================================================================
|
|
|
|
|
|
class TestAnthropicInterruptHandler:
|
|
"""_interruptible_api_call must handle Anthropic mode when interrupted."""
|
|
|
|
def test_interruptible_has_anthropic_branch(self):
|
|
"""The interrupt handler must check api_mode == 'anthropic_messages'."""
|
|
import inspect
|
|
source = inspect.getsource(AIAgent._interruptible_api_call)
|
|
assert "anthropic_messages" in source, \
|
|
"_interruptible_api_call must handle Anthropic interrupt (api_mode check)"
|
|
|
|
def test_interruptible_rebuilds_anthropic_client(self):
|
|
"""After interrupting, the Anthropic client should be rebuilt."""
|
|
import inspect
|
|
source = inspect.getsource(AIAgent._interruptible_api_call)
|
|
assert "build_anthropic_client" in source, \
|
|
"_interruptible_api_call must rebuild Anthropic client after interrupt"
|
|
|
|
def test_streaming_has_anthropic_branch(self):
|
|
"""_streaming_api_call must also handle Anthropic interrupt."""
|
|
import inspect
|
|
source = inspect.getsource(AIAgent._interruptible_streaming_api_call)
|
|
assert "anthropic_messages" in source, \
|
|
"_streaming_api_call must handle Anthropic interrupt"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bugfix: stream_callback forwarding for non-streaming providers
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestStreamCallbackNonStreamingProvider:
|
|
"""When api_mode != chat_completions, stream_callback must still receive
|
|
the response content so TTS works (batch delivery)."""
|
|
|
|
def test_callback_receives_chat_completions_response(self, agent):
|
|
"""For chat_completions-shaped responses, callback gets content."""
|
|
agent.api_mode = "anthropic_messages"
|
|
mock_response = SimpleNamespace(
|
|
choices=[SimpleNamespace(
|
|
message=SimpleNamespace(content="Hello", tool_calls=None, reasoning_content=None),
|
|
finish_reason="stop", index=0,
|
|
)],
|
|
usage=None, model="test", id="test-id",
|
|
)
|
|
agent._interruptible_api_call = MagicMock(return_value=mock_response)
|
|
|
|
received = []
|
|
cb = lambda delta: received.append(delta)
|
|
agent._stream_callback = cb
|
|
|
|
_cb = getattr(agent, "_stream_callback", None)
|
|
response = agent._interruptible_api_call({})
|
|
if _cb is not None and response:
|
|
try:
|
|
if agent.api_mode == "anthropic_messages":
|
|
text_parts = [
|
|
block.text for block in getattr(response, "content", [])
|
|
if getattr(block, "type", None) == "text" and getattr(block, "text", None)
|
|
]
|
|
content = " ".join(text_parts) if text_parts else None
|
|
else:
|
|
content = response.choices[0].message.content
|
|
if content:
|
|
_cb(content)
|
|
except Exception:
|
|
pass
|
|
|
|
# Anthropic format not matched above; fallback via except
|
|
# Test the actual code path by checking chat_completions branch
|
|
received2 = []
|
|
agent.api_mode = "some_other_mode"
|
|
agent._stream_callback = lambda d: received2.append(d)
|
|
_cb2 = agent._stream_callback
|
|
if _cb2 is not None and mock_response:
|
|
try:
|
|
content = mock_response.choices[0].message.content
|
|
if content:
|
|
_cb2(content)
|
|
except Exception:
|
|
pass
|
|
assert received2 == ["Hello"]
|
|
|
|
def test_callback_receives_anthropic_content(self, agent):
|
|
"""For Anthropic responses, text blocks are extracted and forwarded."""
|
|
agent.api_mode = "anthropic_messages"
|
|
mock_response = SimpleNamespace(
|
|
content=[SimpleNamespace(type="text", text="Hello from Claude")],
|
|
stop_reason="end_turn",
|
|
)
|
|
|
|
received = []
|
|
cb = lambda d: received.append(d)
|
|
agent._stream_callback = cb
|
|
_cb = agent._stream_callback
|
|
|
|
if _cb is not None and mock_response:
|
|
try:
|
|
if agent.api_mode == "anthropic_messages":
|
|
text_parts = [
|
|
block.text for block in getattr(mock_response, "content", [])
|
|
if getattr(block, "type", None) == "text" and getattr(block, "text", None)
|
|
]
|
|
content = " ".join(text_parts) if text_parts else None
|
|
else:
|
|
content = mock_response.choices[0].message.content
|
|
if content:
|
|
_cb(content)
|
|
except Exception:
|
|
pass
|
|
|
|
assert received == ["Hello from Claude"]
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bugfix: API-only user message prefixes must not persist
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestPersistUserMessageOverride:
|
|
"""Synthetic API-only user prefixes should never leak into transcripts."""
|
|
|
|
def test_persist_session_rewrites_current_turn_user_message(self, agent):
|
|
agent._session_db = MagicMock()
|
|
agent.session_id = "session-123"
|
|
agent._last_flushed_db_idx = 0
|
|
agent._persist_user_message_idx = 0
|
|
agent._persist_user_message_override = "Hello there"
|
|
messages = [
|
|
{
|
|
"role": "user",
|
|
"content": (
|
|
"[Voice input — respond concisely and conversationally, "
|
|
"2-3 sentences max. No code blocks or markdown.] Hello there"
|
|
),
|
|
},
|
|
{"role": "assistant", "content": "Hi!"},
|
|
]
|
|
|
|
with patch.object(agent, "_save_session_log") as mock_save:
|
|
agent._persist_session(messages, [])
|
|
|
|
assert messages[0]["content"] == "Hello there"
|
|
saved_messages = mock_save.call_args.args[0]
|
|
assert saved_messages[0]["content"] == "Hello there"
|
|
first_db_write = agent._session_db.append_message.call_args_list[0].kwargs
|
|
assert first_db_write["content"] == "Hello there"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Bugfix: _vprint force=True on error messages during TTS
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestVprintForceOnErrors:
|
|
"""Error/warning messages must be visible during streaming TTS."""
|
|
|
|
def test_forced_message_shown_during_tts(self, agent):
|
|
agent._stream_callback = lambda x: None
|
|
printed = []
|
|
with patch("builtins.print", side_effect=lambda *a, **kw: printed.append(a)):
|
|
agent._vprint("error msg", force=True)
|
|
assert len(printed) == 1
|
|
|
|
def test_non_forced_suppressed_during_tts(self, agent):
|
|
agent._stream_callback = lambda x: None
|
|
printed = []
|
|
with patch("builtins.print", side_effect=lambda *a, **kw: printed.append(a)):
|
|
agent._vprint("debug info")
|
|
assert len(printed) == 0
|
|
|
|
def test_all_shown_without_tts(self, agent):
|
|
agent._stream_callback = None
|
|
printed = []
|
|
with patch("builtins.print", side_effect=lambda *a, **kw: printed.append(a)):
|
|
agent._vprint("debug")
|
|
agent._vprint("error", force=True)
|
|
assert len(printed) == 2
|
|
|
|
|
|
class TestNormalizeCodexDictArguments:
|
|
"""_normalize_codex_response must produce valid JSON strings for tool
|
|
call arguments, even when the Responses API returns them as dicts."""
|
|
|
|
def _make_codex_response(self, item_type, arguments, item_status="completed"):
|
|
"""Build a minimal Responses API response with a single tool call."""
|
|
item = SimpleNamespace(
|
|
type=item_type,
|
|
status=item_status,
|
|
)
|
|
if item_type == "function_call":
|
|
item.name = "web_search"
|
|
item.arguments = arguments
|
|
item.call_id = "call_abc123"
|
|
item.id = "fc_abc123"
|
|
elif item_type == "custom_tool_call":
|
|
item.name = "web_search"
|
|
item.input = arguments
|
|
item.call_id = "call_abc123"
|
|
item.id = "fc_abc123"
|
|
return SimpleNamespace(
|
|
output=[item],
|
|
status="completed",
|
|
)
|
|
|
|
def test_function_call_dict_arguments_produce_valid_json(self, agent):
|
|
"""dict arguments from function_call must be serialised with
|
|
json.dumps, not str(), so downstream json.loads() succeeds."""
|
|
args_dict = {"query": "weather in NYC", "units": "celsius"}
|
|
response = self._make_codex_response("function_call", args_dict)
|
|
msg, _ = agent._normalize_codex_response(response)
|
|
tc = msg.tool_calls[0]
|
|
parsed = json.loads(tc.function.arguments)
|
|
assert parsed == args_dict
|
|
|
|
def test_custom_tool_call_dict_arguments_produce_valid_json(self, agent):
|
|
"""dict arguments from custom_tool_call must also use json.dumps."""
|
|
args_dict = {"path": "/tmp/test.txt", "content": "hello"}
|
|
response = self._make_codex_response("custom_tool_call", args_dict)
|
|
msg, _ = agent._normalize_codex_response(response)
|
|
tc = msg.tool_calls[0]
|
|
parsed = json.loads(tc.function.arguments)
|
|
assert parsed == args_dict
|
|
|
|
def test_string_arguments_unchanged(self, agent):
|
|
"""String arguments must pass through without modification."""
|
|
args_str = '{"query": "test"}'
|
|
response = self._make_codex_response("function_call", args_str)
|
|
msg, _ = agent._normalize_codex_response(response)
|
|
tc = msg.tool_calls[0]
|
|
assert tc.function.arguments == args_str
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# OAuth flag and nudge counter fixes (salvaged from PR #1797)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
class TestOAuthFlagAfterCredentialRefresh:
|
|
"""_is_anthropic_oauth must update when token type changes during refresh."""
|
|
|
|
def test_oauth_flag_updates_api_key_to_oauth(self, agent):
|
|
"""Refreshing from API key to OAuth token must set flag to True."""
|
|
agent.api_mode = "anthropic_messages"
|
|
agent.provider = "anthropic"
|
|
agent._anthropic_api_key = "sk-ant-api-old"
|
|
agent._anthropic_client = MagicMock()
|
|
agent._is_anthropic_oauth = False
|
|
|
|
with (
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token",
|
|
return_value="sk-ant-setup-oauth-token"),
|
|
patch("agent.anthropic_adapter.build_anthropic_client",
|
|
return_value=MagicMock()),
|
|
):
|
|
result = agent._try_refresh_anthropic_client_credentials()
|
|
|
|
assert result is True
|
|
assert agent._is_anthropic_oauth is True
|
|
|
|
def test_oauth_flag_updates_oauth_to_api_key(self, agent):
|
|
"""Refreshing from OAuth to API key must set flag to False."""
|
|
agent.api_mode = "anthropic_messages"
|
|
agent.provider = "anthropic"
|
|
agent._anthropic_api_key = "sk-ant-setup-old"
|
|
agent._anthropic_client = MagicMock()
|
|
agent._is_anthropic_oauth = True
|
|
|
|
with (
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token",
|
|
return_value="sk-ant-api03-new-key"),
|
|
patch("agent.anthropic_adapter.build_anthropic_client",
|
|
return_value=MagicMock()),
|
|
):
|
|
result = agent._try_refresh_anthropic_client_credentials()
|
|
|
|
assert result is True
|
|
assert agent._is_anthropic_oauth is False
|
|
|
|
|
|
class TestFallbackSetsOAuthFlag:
|
|
"""_try_activate_fallback must set _is_anthropic_oauth for Anthropic fallbacks."""
|
|
|
|
def test_fallback_to_anthropic_oauth_sets_flag(self, agent):
|
|
agent._fallback_activated = False
|
|
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-6"}
|
|
|
|
mock_client = MagicMock()
|
|
mock_client.base_url = "https://api.anthropic.com/v1"
|
|
mock_client.api_key = "sk-ant-setup-oauth-token"
|
|
|
|
with (
|
|
patch("agent.auxiliary_client.resolve_provider_client",
|
|
return_value=(mock_client, None)),
|
|
patch("agent.anthropic_adapter.build_anthropic_client",
|
|
return_value=MagicMock()),
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token",
|
|
return_value=None),
|
|
):
|
|
result = agent._try_activate_fallback()
|
|
|
|
assert result is True
|
|
assert agent._is_anthropic_oauth is True
|
|
|
|
def test_fallback_to_anthropic_api_key_clears_flag(self, agent):
|
|
agent._fallback_activated = False
|
|
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-6"}
|
|
|
|
mock_client = MagicMock()
|
|
mock_client.base_url = "https://api.anthropic.com/v1"
|
|
mock_client.api_key = "sk-ant-api03-regular-key"
|
|
|
|
with (
|
|
patch("agent.auxiliary_client.resolve_provider_client",
|
|
return_value=(mock_client, None)),
|
|
patch("agent.anthropic_adapter.build_anthropic_client",
|
|
return_value=MagicMock()),
|
|
patch("agent.anthropic_adapter.resolve_anthropic_token",
|
|
return_value=None),
|
|
):
|
|
result = agent._try_activate_fallback()
|
|
|
|
assert result is True
|
|
assert agent._is_anthropic_oauth is False
|
|
|
|
|
|
class TestMemoryNudgeCounterPersistence:
|
|
"""_turns_since_memory must persist across run_conversation calls."""
|
|
|
|
def test_counters_initialized_in_init(self):
|
|
"""Counters must exist on the agent after __init__."""
|
|
with patch("run_agent.get_tool_definitions", return_value=[]):
|
|
a = AIAgent(
|
|
model="test", api_key="test-key", provider="openrouter",
|
|
skip_context_files=True, skip_memory=True,
|
|
)
|
|
assert hasattr(a, "_turns_since_memory")
|
|
assert hasattr(a, "_iters_since_skill")
|
|
assert a._turns_since_memory == 0
|
|
assert a._iters_since_skill == 0
|
|
|
|
def test_counters_not_reset_in_preamble(self):
|
|
"""The run_conversation preamble must not zero the nudge counters."""
|
|
import inspect
|
|
src = inspect.getsource(AIAgent.run_conversation)
|
|
# The preamble resets many fields (retry counts, budget, etc.)
|
|
# before the main loop. Find that reset block and verify our
|
|
# counters aren't in it. The reset block ends at iteration_budget.
|
|
preamble_end = src.index("self.iteration_budget = IterationBudget")
|
|
preamble = src[:preamble_end]
|
|
assert "self._turns_since_memory = 0" not in preamble
|
|
assert "self._iters_since_skill = 0" not in preamble
|
|
|
|
|
|
class TestDeadRetryCode:
|
|
"""Unreachable retry_count >= max_retries after raise must not exist."""
|
|
|
|
def test_no_unreachable_max_retries_after_backoff(self):
|
|
import inspect
|
|
source = inspect.getsource(AIAgent.run_conversation)
|
|
occurrences = source.count("if retry_count >= max_retries:")
|
|
assert occurrences == 2, (
|
|
f"Expected 2 occurrences of 'if retry_count >= max_retries:' "
|
|
f"but found {occurrences}"
|
|
)
|