Files
hermes-agent/tests/test_run_agent.py

3330 lines
135 KiB
Python
Raw Permalink Normal View History

"""Unit tests for run_agent.py (AIAgent).
Tests cover pure functions, state/structure methods, and conversation loop
pieces. The OpenAI client and tool loading are mocked so no network calls
are made.
"""
import json
2026-03-13 23:56:51 -07:00
import logging
import re
import uuid
2026-03-13 23:56:51 -07:00
from logging.handlers import RotatingFileHandler
from pathlib import Path
from types import SimpleNamespace
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
2026-03-13 23:56:51 -07:00
import run_agent
from honcho_integration.client import HonchoClientConfig
from run_agent import AIAgent, _inject_honcho_turn_context
from agent.prompt_builder import DEFAULT_AGENT_IDENTITY
# ---------------------------------------------------------------------------
# Fixtures
# ---------------------------------------------------------------------------
def _make_tool_defs(*names: str) -> list:
"""Build minimal tool definition list accepted by AIAgent.__init__."""
return [
{
"type": "function",
"function": {
"name": n,
"description": f"{n} tool",
"parameters": {"type": "object", "properties": {}},
},
}
for n in names
]
@pytest.fixture()
def agent():
"""Minimal AIAgent with mocked OpenAI client and tool loading."""
with (
patch(
"run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
return a
@pytest.fixture()
def agent_with_memory_tool():
"""Agent whose valid_tool_names includes 'memory'."""
with (
patch(
"run_agent.get_tool_definitions",
return_value=_make_tool_defs("web_search", "memory"),
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
2026-03-13 23:56:51 -07:00
api_key="test-k...7890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
return a
2026-03-13 23:56:51 -07:00
def test_aiagent_reuses_existing_errors_log_handler():
"""Repeated AIAgent init should not accumulate duplicate errors.log handlers."""
root_logger = logging.getLogger()
original_handlers = list(root_logger.handlers)
error_log_path = (run_agent._hermes_home / "logs" / "errors.log").resolve()
try:
for handler in list(root_logger.handlers):
root_logger.removeHandler(handler)
error_log_path.parent.mkdir(parents=True, exist_ok=True)
preexisting_handler = RotatingFileHandler(
error_log_path,
maxBytes=2 * 1024 * 1024,
backupCount=2,
)
root_logger.addHandler(preexisting_handler)
with (
patch(
"run_agent.get_tool_definitions",
return_value=_make_tool_defs("web_search"),
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
AIAgent(
api_key="test-k...7890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
AIAgent(
api_key="test-k...7890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
matching_handlers = [
handler for handler in root_logger.handlers
if isinstance(handler, RotatingFileHandler)
and error_log_path == Path(handler.baseFilename).resolve()
]
assert len(matching_handlers) == 1
finally:
for handler in list(root_logger.handlers):
root_logger.removeHandler(handler)
if handler not in original_handlers:
handler.close()
for handler in original_handlers:
root_logger.addHandler(handler)
# ---------------------------------------------------------------------------
# Helper to build mock assistant messages (API response objects)
# ---------------------------------------------------------------------------
def _mock_assistant_msg(
content="Hello",
tool_calls=None,
reasoning=None,
reasoning_content=None,
reasoning_details=None,
):
"""Return a SimpleNamespace mimicking an OpenAI ChatCompletionMessage."""
msg = SimpleNamespace(content=content, tool_calls=tool_calls)
if reasoning is not None:
msg.reasoning = reasoning
if reasoning_content is not None:
msg.reasoning_content = reasoning_content
if reasoning_details is not None:
msg.reasoning_details = reasoning_details
return msg
def _mock_tool_call(name="web_search", arguments="{}", call_id=None):
"""Return a SimpleNamespace mimicking a tool call object."""
return SimpleNamespace(
id=call_id or f"call_{uuid.uuid4().hex[:8]}",
type="function",
function=SimpleNamespace(name=name, arguments=arguments),
)
def _mock_response(
content="Hello", finish_reason="stop", tool_calls=None, reasoning=None, usage=None
):
"""Return a SimpleNamespace mimicking an OpenAI ChatCompletion response."""
msg = _mock_assistant_msg(
content=content,
tool_calls=tool_calls,
reasoning=reasoning,
)
choice = SimpleNamespace(message=msg, finish_reason=finish_reason)
resp = SimpleNamespace(choices=[choice], model="test/model")
if usage:
resp.usage = SimpleNamespace(**usage)
else:
resp.usage = None
return resp
# ===================================================================
# Group 1: Pure Functions
# ===================================================================
class TestHasContentAfterThinkBlock:
def test_none_returns_false(self, agent):
assert agent._has_content_after_think_block(None) is False
def test_empty_returns_false(self, agent):
assert agent._has_content_after_think_block("") is False
def test_only_think_block_returns_false(self, agent):
assert agent._has_content_after_think_block("<think>reasoning</think>") is False
def test_content_after_think_returns_true(self, agent):
assert (
agent._has_content_after_think_block("<think>r</think> actual answer")
is True
)
def test_no_think_block_returns_true(self, agent):
assert agent._has_content_after_think_block("just normal content") is True
class TestStripThinkBlocks:
def test_none_returns_empty(self, agent):
assert agent._strip_think_blocks(None) == ""
def test_no_blocks_unchanged(self, agent):
assert agent._strip_think_blocks("hello world") == "hello world"
def test_single_block_removed(self, agent):
result = agent._strip_think_blocks("<think>reasoning</think> answer")
assert "reasoning" not in result
assert "answer" in result
def test_multiline_block_removed(self, agent):
text = "<think>\nline1\nline2\n</think>\nvisible"
result = agent._strip_think_blocks(text)
assert "line1" not in result
assert "visible" in result
class TestExtractReasoning:
def test_reasoning_field(self, agent):
msg = _mock_assistant_msg(reasoning="thinking hard")
assert agent._extract_reasoning(msg) == "thinking hard"
def test_reasoning_content_field(self, agent):
msg = _mock_assistant_msg(reasoning_content="deep thought")
assert agent._extract_reasoning(msg) == "deep thought"
def test_reasoning_details_array(self, agent):
msg = _mock_assistant_msg(
reasoning_details=[{"summary": "step-by-step analysis"}],
)
assert "step-by-step analysis" in agent._extract_reasoning(msg)
def test_no_reasoning_returns_none(self, agent):
msg = _mock_assistant_msg()
assert agent._extract_reasoning(msg) is None
def test_combined_reasoning(self, agent):
msg = _mock_assistant_msg(
reasoning="part1",
reasoning_content="part2",
)
result = agent._extract_reasoning(msg)
assert "part1" in result
assert "part2" in result
def test_deduplication(self, agent):
msg = _mock_assistant_msg(
reasoning="same text",
reasoning_content="same text",
)
result = agent._extract_reasoning(msg)
assert result == "same text"
@pytest.mark.parametrize(
("content", "expected"),
[
("<think>thinking hard</think>", "thinking hard"),
("<thinking>step by step</thinking>", "step by step"),
(
"<REASONING_SCRATCHPAD>scratch analysis</REASONING_SCRATCHPAD>",
"scratch analysis",
),
],
)
def test_inline_reasoning_blocks_fallback(self, agent, content, expected):
msg = _mock_assistant_msg(content=content)
assert agent._extract_reasoning(msg) == expected
class TestCleanSessionContent:
def test_none_passthrough(self):
assert AIAgent._clean_session_content(None) is None
def test_scratchpad_converted(self):
text = "<REASONING_SCRATCHPAD>think</REASONING_SCRATCHPAD> answer"
result = AIAgent._clean_session_content(text)
assert "<REASONING_SCRATCHPAD>" not in result
assert "<think>" in result
def test_extra_newlines_cleaned(self):
text = "\n\n\n<think>x</think>\n\n\nafter"
result = AIAgent._clean_session_content(text)
# Should not have excessive newlines around think block
assert "\n\n\n" not in result
# Content after think block must be preserved
assert "after" in result
class TestGetMessagesUpToLastAssistant:
def test_empty_list(self, agent):
assert agent._get_messages_up_to_last_assistant([]) == []
def test_no_assistant_returns_copy(self, agent):
msgs = [{"role": "user", "content": "hi"}]
result = agent._get_messages_up_to_last_assistant(msgs)
assert result == msgs
assert result is not msgs # should be a copy
def test_single_assistant(self, agent):
msgs = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
]
result = agent._get_messages_up_to_last_assistant(msgs)
assert len(result) == 1
assert result[0]["role"] == "user"
def test_multiple_assistants_returns_up_to_last(self, agent):
msgs = [
{"role": "user", "content": "q1"},
{"role": "assistant", "content": "a1"},
{"role": "user", "content": "q2"},
{"role": "assistant", "content": "a2"},
]
result = agent._get_messages_up_to_last_assistant(msgs)
assert len(result) == 3
assert result[-1]["content"] == "q2"
def test_assistant_then_tool_messages(self, agent):
msgs = [
{"role": "user", "content": "do something"},
{"role": "assistant", "content": "ok", "tool_calls": [{"id": "1"}]},
{"role": "tool", "content": "result", "tool_call_id": "1"},
]
# Last assistant is at index 1, so result = msgs[:1]
result = agent._get_messages_up_to_last_assistant(msgs)
assert len(result) == 1
assert result[0]["role"] == "user"
class TestMaskApiKey:
def test_none_returns_none(self, agent):
assert agent._mask_api_key_for_logs(None) is None
def test_short_key_returns_stars(self, agent):
assert agent._mask_api_key_for_logs("short") == "***"
def test_long_key_masked(self, agent):
key = "sk-or-v1-abcdefghijklmnop"
result = agent._mask_api_key_for_logs(key)
assert result.startswith("sk-or-v1")
assert result.endswith("mnop")
assert "..." in result
# ===================================================================
# Group 2: State / Structure Methods
# ===================================================================
class TestInit:
def test_anthropic_base_url_accepted(self):
feat: native Anthropic provider with Claude Code credential auto-discovery Add Anthropic as a first-class inference provider, bypassing OpenRouter for direct API access. Uses the native Anthropic SDK with a full format adapter (same pattern as the codex_responses api_mode). ## Auth (three methods, priority order) 1. ANTHROPIC_API_KEY env var (regular API key, sk-ant-api-*) 2. ANTHROPIC_TOKEN / CLAUDE_CODE_OAUTH_TOKEN env var (setup-token, sk-ant-oat-*) 3. Auto-discovery from ~/.claude/.credentials.json (Claude Code subscription) - Reads Claude Code's OAuth credentials - Checks token expiry with 60s buffer - Setup tokens use Bearer auth + anthropic-beta: oauth-2025-04-20 header - Regular API keys use standard x-api-key header ## Changes by file ### New files - agent/anthropic_adapter.py — Client builder, message/tool/response format conversion, Claude Code credential reader, token resolver. Handles system prompt extraction, tool_use/tool_result blocks, thinking/reasoning, orphaned tool_use cleanup, cache_control. - tests/test_anthropic_adapter.py — 36 tests covering all adapter logic ### Modified files - pyproject.toml — Add anthropic>=0.39.0 dependency - hermes_cli/auth.py — Add 'anthropic' to PROVIDER_REGISTRY with three env vars, plus 'claude'/'claude-code' aliases - hermes_cli/models.py — Add model catalog, labels, aliases, provider order - hermes_cli/main.py — Add 'anthropic' to --provider CLI choices - hermes_cli/runtime_provider.py — Add Anthropic branch returning api_mode='anthropic_messages' (before generic api_key fallthrough) - hermes_cli/setup.py — Add Anthropic setup wizard with Claude Code credential auto-discovery, model selection, OpenRouter tools prompt - agent/auxiliary_client.py — Add claude-haiku-4-5 as aux model - agent/model_metadata.py — Add bare Claude model context lengths - run_agent.py — Add anthropic_messages api_mode: * Client init (Anthropic SDK instead of OpenAI) * API call dispatch (_anthropic_client.messages.create) * Response validation (content blocks) * finish_reason mapping (stop_reason -> finish_reason) * Token usage (input_tokens/output_tokens) * Response normalization (normalize_anthropic_response) * Client interrupt/rebuild * Prompt caching auto-enabled for native Anthropic - tests/test_run_agent.py — Update test_anthropic_base_url_accepted to expect native routing, add test_prompt_caching_native_anthropic
2026-03-12 15:47:45 -07:00
"""Anthropic base URLs should route to native Anthropic client."""
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
feat: native Anthropic provider with Claude Code credential auto-discovery Add Anthropic as a first-class inference provider, bypassing OpenRouter for direct API access. Uses the native Anthropic SDK with a full format adapter (same pattern as the codex_responses api_mode). ## Auth (three methods, priority order) 1. ANTHROPIC_API_KEY env var (regular API key, sk-ant-api-*) 2. ANTHROPIC_TOKEN / CLAUDE_CODE_OAUTH_TOKEN env var (setup-token, sk-ant-oat-*) 3. Auto-discovery from ~/.claude/.credentials.json (Claude Code subscription) - Reads Claude Code's OAuth credentials - Checks token expiry with 60s buffer - Setup tokens use Bearer auth + anthropic-beta: oauth-2025-04-20 header - Regular API keys use standard x-api-key header ## Changes by file ### New files - agent/anthropic_adapter.py — Client builder, message/tool/response format conversion, Claude Code credential reader, token resolver. Handles system prompt extraction, tool_use/tool_result blocks, thinking/reasoning, orphaned tool_use cleanup, cache_control. - tests/test_anthropic_adapter.py — 36 tests covering all adapter logic ### Modified files - pyproject.toml — Add anthropic>=0.39.0 dependency - hermes_cli/auth.py — Add 'anthropic' to PROVIDER_REGISTRY with three env vars, plus 'claude'/'claude-code' aliases - hermes_cli/models.py — Add model catalog, labels, aliases, provider order - hermes_cli/main.py — Add 'anthropic' to --provider CLI choices - hermes_cli/runtime_provider.py — Add Anthropic branch returning api_mode='anthropic_messages' (before generic api_key fallthrough) - hermes_cli/setup.py — Add Anthropic setup wizard with Claude Code credential auto-discovery, model selection, OpenRouter tools prompt - agent/auxiliary_client.py — Add claude-haiku-4-5 as aux model - agent/model_metadata.py — Add bare Claude model context lengths - run_agent.py — Add anthropic_messages api_mode: * Client init (Anthropic SDK instead of OpenAI) * API call dispatch (_anthropic_client.messages.create) * Response validation (content blocks) * finish_reason mapping (stop_reason -> finish_reason) * Token usage (input_tokens/output_tokens) * Response normalization (normalize_anthropic_response) * Client interrupt/rebuild * Prompt caching auto-enabled for native Anthropic - tests/test_run_agent.py — Update test_anthropic_base_url_accepted to expect native routing, add test_prompt_caching_native_anthropic
2026-03-12 15:47:45 -07:00
patch("agent.anthropic_adapter._anthropic_sdk") as mock_anthropic,
):
feat: native Anthropic provider with Claude Code credential auto-discovery Add Anthropic as a first-class inference provider, bypassing OpenRouter for direct API access. Uses the native Anthropic SDK with a full format adapter (same pattern as the codex_responses api_mode). ## Auth (three methods, priority order) 1. ANTHROPIC_API_KEY env var (regular API key, sk-ant-api-*) 2. ANTHROPIC_TOKEN / CLAUDE_CODE_OAUTH_TOKEN env var (setup-token, sk-ant-oat-*) 3. Auto-discovery from ~/.claude/.credentials.json (Claude Code subscription) - Reads Claude Code's OAuth credentials - Checks token expiry with 60s buffer - Setup tokens use Bearer auth + anthropic-beta: oauth-2025-04-20 header - Regular API keys use standard x-api-key header ## Changes by file ### New files - agent/anthropic_adapter.py — Client builder, message/tool/response format conversion, Claude Code credential reader, token resolver. Handles system prompt extraction, tool_use/tool_result blocks, thinking/reasoning, orphaned tool_use cleanup, cache_control. - tests/test_anthropic_adapter.py — 36 tests covering all adapter logic ### Modified files - pyproject.toml — Add anthropic>=0.39.0 dependency - hermes_cli/auth.py — Add 'anthropic' to PROVIDER_REGISTRY with three env vars, plus 'claude'/'claude-code' aliases - hermes_cli/models.py — Add model catalog, labels, aliases, provider order - hermes_cli/main.py — Add 'anthropic' to --provider CLI choices - hermes_cli/runtime_provider.py — Add Anthropic branch returning api_mode='anthropic_messages' (before generic api_key fallthrough) - hermes_cli/setup.py — Add Anthropic setup wizard with Claude Code credential auto-discovery, model selection, OpenRouter tools prompt - agent/auxiliary_client.py — Add claude-haiku-4-5 as aux model - agent/model_metadata.py — Add bare Claude model context lengths - run_agent.py — Add anthropic_messages api_mode: * Client init (Anthropic SDK instead of OpenAI) * API call dispatch (_anthropic_client.messages.create) * Response validation (content blocks) * finish_reason mapping (stop_reason -> finish_reason) * Token usage (input_tokens/output_tokens) * Response normalization (normalize_anthropic_response) * Client interrupt/rebuild * Prompt caching auto-enabled for native Anthropic - tests/test_run_agent.py — Update test_anthropic_base_url_accepted to expect native routing, add test_prompt_caching_native_anthropic
2026-03-12 15:47:45 -07:00
agent = AIAgent(
api_key="test-key-1234567890",
base_url="https://api.anthropic.com/v1/",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
feat: native Anthropic provider with Claude Code credential auto-discovery Add Anthropic as a first-class inference provider, bypassing OpenRouter for direct API access. Uses the native Anthropic SDK with a full format adapter (same pattern as the codex_responses api_mode). ## Auth (three methods, priority order) 1. ANTHROPIC_API_KEY env var (regular API key, sk-ant-api-*) 2. ANTHROPIC_TOKEN / CLAUDE_CODE_OAUTH_TOKEN env var (setup-token, sk-ant-oat-*) 3. Auto-discovery from ~/.claude/.credentials.json (Claude Code subscription) - Reads Claude Code's OAuth credentials - Checks token expiry with 60s buffer - Setup tokens use Bearer auth + anthropic-beta: oauth-2025-04-20 header - Regular API keys use standard x-api-key header ## Changes by file ### New files - agent/anthropic_adapter.py — Client builder, message/tool/response format conversion, Claude Code credential reader, token resolver. Handles system prompt extraction, tool_use/tool_result blocks, thinking/reasoning, orphaned tool_use cleanup, cache_control. - tests/test_anthropic_adapter.py — 36 tests covering all adapter logic ### Modified files - pyproject.toml — Add anthropic>=0.39.0 dependency - hermes_cli/auth.py — Add 'anthropic' to PROVIDER_REGISTRY with three env vars, plus 'claude'/'claude-code' aliases - hermes_cli/models.py — Add model catalog, labels, aliases, provider order - hermes_cli/main.py — Add 'anthropic' to --provider CLI choices - hermes_cli/runtime_provider.py — Add Anthropic branch returning api_mode='anthropic_messages' (before generic api_key fallthrough) - hermes_cli/setup.py — Add Anthropic setup wizard with Claude Code credential auto-discovery, model selection, OpenRouter tools prompt - agent/auxiliary_client.py — Add claude-haiku-4-5 as aux model - agent/model_metadata.py — Add bare Claude model context lengths - run_agent.py — Add anthropic_messages api_mode: * Client init (Anthropic SDK instead of OpenAI) * API call dispatch (_anthropic_client.messages.create) * Response validation (content blocks) * finish_reason mapping (stop_reason -> finish_reason) * Token usage (input_tokens/output_tokens) * Response normalization (normalize_anthropic_response) * Client interrupt/rebuild * Prompt caching auto-enabled for native Anthropic - tests/test_run_agent.py — Update test_anthropic_base_url_accepted to expect native routing, add test_prompt_caching_native_anthropic
2026-03-12 15:47:45 -07:00
assert agent.api_mode == "anthropic_messages"
mock_anthropic.Anthropic.assert_called_once()
def test_prompt_caching_claude_openrouter(self):
"""Claude model via OpenRouter should enable prompt caching."""
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
model="anthropic/claude-sonnet-4-20250514",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a._use_prompt_caching is True
def test_prompt_caching_non_claude(self):
"""Non-Claude model should disable prompt caching."""
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
model="openai/gpt-4o",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a._use_prompt_caching is False
def test_prompt_caching_non_openrouter(self):
"""Custom base_url (not OpenRouter) should disable prompt caching."""
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
model="anthropic/claude-sonnet-4-20250514",
base_url="http://localhost:8080/v1",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a._use_prompt_caching is False
feat: native Anthropic provider with Claude Code credential auto-discovery Add Anthropic as a first-class inference provider, bypassing OpenRouter for direct API access. Uses the native Anthropic SDK with a full format adapter (same pattern as the codex_responses api_mode). ## Auth (three methods, priority order) 1. ANTHROPIC_API_KEY env var (regular API key, sk-ant-api-*) 2. ANTHROPIC_TOKEN / CLAUDE_CODE_OAUTH_TOKEN env var (setup-token, sk-ant-oat-*) 3. Auto-discovery from ~/.claude/.credentials.json (Claude Code subscription) - Reads Claude Code's OAuth credentials - Checks token expiry with 60s buffer - Setup tokens use Bearer auth + anthropic-beta: oauth-2025-04-20 header - Regular API keys use standard x-api-key header ## Changes by file ### New files - agent/anthropic_adapter.py — Client builder, message/tool/response format conversion, Claude Code credential reader, token resolver. Handles system prompt extraction, tool_use/tool_result blocks, thinking/reasoning, orphaned tool_use cleanup, cache_control. - tests/test_anthropic_adapter.py — 36 tests covering all adapter logic ### Modified files - pyproject.toml — Add anthropic>=0.39.0 dependency - hermes_cli/auth.py — Add 'anthropic' to PROVIDER_REGISTRY with three env vars, plus 'claude'/'claude-code' aliases - hermes_cli/models.py — Add model catalog, labels, aliases, provider order - hermes_cli/main.py — Add 'anthropic' to --provider CLI choices - hermes_cli/runtime_provider.py — Add Anthropic branch returning api_mode='anthropic_messages' (before generic api_key fallthrough) - hermes_cli/setup.py — Add Anthropic setup wizard with Claude Code credential auto-discovery, model selection, OpenRouter tools prompt - agent/auxiliary_client.py — Add claude-haiku-4-5 as aux model - agent/model_metadata.py — Add bare Claude model context lengths - run_agent.py — Add anthropic_messages api_mode: * Client init (Anthropic SDK instead of OpenAI) * API call dispatch (_anthropic_client.messages.create) * Response validation (content blocks) * finish_reason mapping (stop_reason -> finish_reason) * Token usage (input_tokens/output_tokens) * Response normalization (normalize_anthropic_response) * Client interrupt/rebuild * Prompt caching auto-enabled for native Anthropic - tests/test_run_agent.py — Update test_anthropic_base_url_accepted to expect native routing, add test_prompt_caching_native_anthropic
2026-03-12 15:47:45 -07:00
def test_prompt_caching_native_anthropic(self):
"""Native Anthropic provider should enable prompt caching."""
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter._anthropic_sdk"),
):
a = AIAgent(
api_key="test-key-1234567890",
base_url="https://api.anthropic.com/v1/",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a.api_mode == "anthropic_messages"
assert a._use_prompt_caching is True
def test_valid_tool_names_populated(self):
"""valid_tool_names should contain names from loaded tools."""
tools = _make_tool_defs("web_search", "terminal")
with (
patch("run_agent.get_tool_definitions", return_value=tools),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert a.valid_tool_names == {"web_search", "terminal"}
def test_session_id_auto_generated(self):
"""Session ID should be auto-generated in YYYYMMDD_HHMMSS_<hex6> format."""
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
# Format: YYYYMMDD_HHMMSS_<6 hex chars>
assert re.match(r"^\d{8}_\d{6}_[0-9a-f]{6}$", a.session_id), (
f"session_id doesn't match expected format: {a.session_id}"
)
class TestInterrupt:
def test_interrupt_sets_flag(self, agent):
with patch("run_agent._set_interrupt"):
agent.interrupt()
assert agent._interrupt_requested is True
def test_interrupt_with_message(self, agent):
with patch("run_agent._set_interrupt"):
agent.interrupt("new question")
assert agent._interrupt_message == "new question"
def test_clear_interrupt(self, agent):
with patch("run_agent._set_interrupt"):
agent.interrupt("msg")
agent.clear_interrupt()
assert agent._interrupt_requested is False
assert agent._interrupt_message is None
def test_is_interrupted_property(self, agent):
assert agent.is_interrupted is False
with patch("run_agent._set_interrupt"):
agent.interrupt()
assert agent.is_interrupted is True
class TestHydrateTodoStore:
def test_no_todo_in_history(self, agent):
history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
with patch("run_agent._set_interrupt"):
agent._hydrate_todo_store(history)
assert not agent._todo_store.has_items()
def test_recovers_from_history(self, agent):
todos = [{"id": "1", "content": "do thing", "status": "pending"}]
history = [
{"role": "user", "content": "plan"},
{"role": "assistant", "content": "ok"},
{
"role": "tool",
"content": json.dumps({"todos": todos}),
"tool_call_id": "c1",
},
]
with patch("run_agent._set_interrupt"):
agent._hydrate_todo_store(history)
assert agent._todo_store.has_items()
def test_skips_non_todo_tools(self, agent):
history = [
{
"role": "tool",
"content": '{"result": "search done"}',
"tool_call_id": "c1",
},
]
with patch("run_agent._set_interrupt"):
agent._hydrate_todo_store(history)
assert not agent._todo_store.has_items()
def test_invalid_json_skipped(self, agent):
history = [
{
"role": "tool",
"content": 'not valid json "todos" oops',
"tool_call_id": "c1",
},
]
with patch("run_agent._set_interrupt"):
agent._hydrate_todo_store(history)
assert not agent._todo_store.has_items()
class TestBuildSystemPrompt:
def test_always_has_identity(self, agent):
prompt = agent._build_system_prompt()
assert DEFAULT_AGENT_IDENTITY in prompt
def test_includes_system_message(self, agent):
prompt = agent._build_system_prompt(system_message="Custom instruction")
assert "Custom instruction" in prompt
def test_memory_guidance_when_memory_tool_loaded(self, agent_with_memory_tool):
from agent.prompt_builder import MEMORY_GUIDANCE
prompt = agent_with_memory_tool._build_system_prompt()
assert MEMORY_GUIDANCE in prompt
def test_no_memory_guidance_without_tool(self, agent):
from agent.prompt_builder import MEMORY_GUIDANCE
prompt = agent._build_system_prompt()
assert MEMORY_GUIDANCE not in prompt
def test_includes_datetime(self, agent):
prompt = agent._build_system_prompt()
# Should contain current date info like "Conversation started:"
assert "Conversation started:" in prompt
def test_skills_prompt_derives_available_toolsets_from_loaded_tools(self):
tools = _make_tool_defs("web_search", "skills_list", "skill_view", "skill_manage")
toolset_map = {
"web_search": "web",
"skills_list": "skills",
"skill_view": "skills",
"skill_manage": "skills",
}
with (
patch("run_agent.get_tool_definitions", return_value=tools),
patch(
"run_agent.check_toolset_requirements",
side_effect=AssertionError("should not re-check toolset requirements"),
),
patch("run_agent.get_toolset_for_tool", create=True, side_effect=toolset_map.get),
patch("run_agent.build_skills_system_prompt", return_value="SKILLS_PROMPT") as mock_skills,
patch("run_agent.OpenAI"),
):
agent = AIAgent(
api_key="test-k...7890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
prompt = agent._build_system_prompt()
assert "SKILLS_PROMPT" in prompt
assert mock_skills.call_args.kwargs["available_tools"] == set(toolset_map)
assert mock_skills.call_args.kwargs["available_toolsets"] == {"web", "skills"}
class TestToolUseEnforcementConfig:
"""Tests for the agent.tool_use_enforcement config option."""
def _make_agent(self, model="openai/gpt-4.1", tool_use_enforcement="auto"):
"""Create an agent with tools and a specific enforcement config."""
with (
patch(
"run_agent.get_tool_definitions",
return_value=_make_tool_defs("terminal", "web_search"),
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch(
"hermes_cli.config.load_config",
return_value={"agent": {"tool_use_enforcement": tool_use_enforcement}},
),
):
a = AIAgent(
model=model,
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
a.client = MagicMock()
return a
def test_auto_injects_for_gpt(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="openai/gpt-4.1", tool_use_enforcement="auto")
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_auto_injects_for_codex(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="openai/codex-mini", tool_use_enforcement="auto")
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_auto_skips_for_claude(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="anthropic/claude-sonnet-4", tool_use_enforcement="auto")
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE not in prompt
def test_true_forces_for_all_models(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="anthropic/claude-sonnet-4", tool_use_enforcement=True)
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_string_true_forces_for_all_models(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="anthropic/claude-sonnet-4", tool_use_enforcement="true")
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_always_forces_for_all_models(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="deepseek/deepseek-r1", tool_use_enforcement="always")
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_false_disables_for_gpt(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="openai/gpt-4.1", tool_use_enforcement=False)
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE not in prompt
def test_string_false_disables(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(model="openai/gpt-4.1", tool_use_enforcement="off")
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE not in prompt
def test_custom_list_matches(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(
model="deepseek/deepseek-r1",
tool_use_enforcement=["deepseek", "gemini"],
)
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_custom_list_no_match(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(
model="anthropic/claude-sonnet-4",
tool_use_enforcement=["deepseek", "gemini"],
)
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE not in prompt
def test_custom_list_case_insensitive(self):
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
agent = self._make_agent(
model="openai/GPT-4.1",
tool_use_enforcement=["GPT", "Codex"],
)
prompt = agent._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE in prompt
def test_no_tools_never_injects(self):
"""Even with enforcement=true, no injection when agent has no tools."""
from agent.prompt_builder import TOOL_USE_ENFORCEMENT_GUIDANCE
with (
patch("run_agent.get_tool_definitions", return_value=[]),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch(
"hermes_cli.config.load_config",
return_value={"agent": {"tool_use_enforcement": True}},
),
):
a = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
enabled_toolsets=[],
)
a.client = MagicMock()
prompt = a._build_system_prompt()
assert TOOL_USE_ENFORCEMENT_GUIDANCE not in prompt
class TestInvalidateSystemPrompt:
def test_clears_cache(self, agent):
agent._cached_system_prompt = "cached value"
agent._invalidate_system_prompt()
assert agent._cached_system_prompt is None
def test_reloads_memory_store(self, agent):
mock_store = MagicMock()
agent._memory_store = mock_store
agent._cached_system_prompt = "cached"
agent._invalidate_system_prompt()
mock_store.load_from_disk.assert_called_once()
class TestBuildApiKwargs:
def test_basic_kwargs(self, agent):
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["model"] == agent.model
assert kwargs["messages"] is messages
assert kwargs["timeout"] == 1800.0
def test_provider_preferences_injected(self, agent):
agent.providers_allowed = ["Anthropic"]
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["extra_body"]["provider"]["only"] == ["Anthropic"]
def test_reasoning_config_default_openrouter(self, agent):
"""Default reasoning config for OpenRouter should be medium."""
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
reasoning = kwargs["extra_body"]["reasoning"]
assert reasoning["enabled"] is True
assert reasoning["effort"] == "medium"
def test_reasoning_config_custom(self, agent):
agent.reasoning_config = {"enabled": False}
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["extra_body"]["reasoning"] == {"enabled": False}
def test_reasoning_not_sent_for_unsupported_openrouter_model(self, agent):
agent.model = "minimax/minimax-m2.5"
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "reasoning" not in kwargs.get("extra_body", {})
def test_reasoning_sent_for_supported_openrouter_model(self, agent):
agent.model = "qwen/qwen3.5-plus-02-15"
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["extra_body"]["reasoning"]["effort"] == "medium"
def test_reasoning_sent_for_nous_route(self, agent):
agent.base_url = "https://inference-api.nousresearch.com/v1"
agent.model = "minimax/minimax-m2.5"
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["extra_body"]["reasoning"]["effort"] == "medium"
def test_reasoning_sent_for_copilot_gpt5(self, agent):
agent.base_url = "https://api.githubcopilot.com"
agent.model = "gpt-5.4"
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["extra_body"]["reasoning"] == {"effort": "medium"}
def test_reasoning_xhigh_normalized_for_copilot(self, agent):
agent.base_url = "https://api.githubcopilot.com"
agent.model = "gpt-5.4"
agent.reasoning_config = {"enabled": True, "effort": "xhigh"}
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["extra_body"]["reasoning"] == {"effort": "high"}
def test_reasoning_omitted_for_non_reasoning_copilot_model(self, agent):
agent.base_url = "https://api.githubcopilot.com"
agent.model = "gpt-4.1"
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert "reasoning" not in kwargs.get("extra_body", {})
def test_max_tokens_injected(self, agent):
agent.max_tokens = 4096
messages = [{"role": "user", "content": "hi"}]
kwargs = agent._build_api_kwargs(messages)
assert kwargs["max_tokens"] == 4096
class TestBuildAssistantMessage:
def test_basic_message(self, agent):
msg = _mock_assistant_msg(content="Hello!")
result = agent._build_assistant_message(msg, "stop")
assert result["role"] == "assistant"
assert result["content"] == "Hello!"
assert result["finish_reason"] == "stop"
def test_with_reasoning(self, agent):
msg = _mock_assistant_msg(content="answer", reasoning="thinking")
result = agent._build_assistant_message(msg, "stop")
assert result["reasoning"] == "thinking"
def test_with_tool_calls(self, agent):
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
msg = _mock_assistant_msg(content="", tool_calls=[tc])
result = agent._build_assistant_message(msg, "tool_calls")
assert len(result["tool_calls"]) == 1
assert result["tool_calls"][0]["function"]["name"] == "web_search"
def test_with_reasoning_details(self, agent):
details = [{"type": "reasoning.summary", "text": "step1", "signature": "sig1"}]
msg = _mock_assistant_msg(content="ans", reasoning_details=details)
result = agent._build_assistant_message(msg, "stop")
assert "reasoning_details" in result
assert result["reasoning_details"][0]["text"] == "step1"
def test_empty_content(self, agent):
msg = _mock_assistant_msg(content=None)
result = agent._build_assistant_message(msg, "stop")
assert result["content"] == ""
def test_tool_call_extra_content_preserved(self, agent):
"""Gemini thinking models attach extra_content with thought_signature
to tool calls. This must be preserved so subsequent API calls include it."""
tc = _mock_tool_call(
name="get_weather", arguments='{"city":"NYC"}', call_id="c2"
)
tc.extra_content = {"google": {"thought_signature": "abc123"}}
msg = _mock_assistant_msg(content="", tool_calls=[tc])
result = agent._build_assistant_message(msg, "tool_calls")
assert result["tool_calls"][0]["extra_content"] == {
"google": {"thought_signature": "abc123"}
}
def test_tool_call_without_extra_content(self, agent):
"""Standard tool calls (no thinking model) should not have extra_content."""
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c3")
msg = _mock_assistant_msg(content="", tool_calls=[tc])
result = agent._build_assistant_message(msg, "tool_calls")
assert "extra_content" not in result["tool_calls"][0]
class TestFormatToolsForSystemMessage:
def test_no_tools_returns_empty_array(self, agent):
agent.tools = []
assert agent._format_tools_for_system_message() == "[]"
def test_formats_single_tool(self, agent):
agent.tools = _make_tool_defs("web_search")
result = agent._format_tools_for_system_message()
parsed = json.loads(result)
assert len(parsed) == 1
assert parsed[0]["name"] == "web_search"
def test_formats_multiple_tools(self, agent):
agent.tools = _make_tool_defs("web_search", "terminal", "read_file")
result = agent._format_tools_for_system_message()
parsed = json.loads(result)
assert len(parsed) == 3
names = {t["name"] for t in parsed}
assert names == {"web_search", "terminal", "read_file"}
# ===================================================================
# Group 3: Conversation Loop Pieces (OpenAI mock)
# ===================================================================
class TestExecuteToolCalls:
def test_single_tool_executed(self, agent):
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
messages = []
with patch(
"run_agent.handle_function_call", return_value="search result"
) as mock_hfc:
agent._execute_tool_calls(mock_msg, messages, "task-1")
# enabled_tools passes the agent's own valid_tool_names
args, kwargs = mock_hfc.call_args
assert args[:3] == ("web_search", {"q": "test"}, "task-1")
assert set(kwargs.get("enabled_tools", [])) == agent.valid_tool_names
assert len(messages) == 1
assert messages[0]["role"] == "tool"
assert "search result" in messages[0]["content"]
def test_interrupt_skips_remaining(self, agent):
tc1 = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments="{}", call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch("run_agent._set_interrupt"):
agent.interrupt()
agent._execute_tool_calls(mock_msg, messages, "task-1")
# Both calls should be skipped with cancellation messages
assert len(messages) == 2
assert (
"cancelled" in messages[0]["content"].lower()
or "interrupted" in messages[0]["content"].lower()
)
def test_invalid_json_args_defaults_empty(self, agent):
tc = _mock_tool_call(
name="web_search", arguments="not valid json", call_id="c1"
)
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
messages = []
with patch("run_agent.handle_function_call", return_value="ok") as mock_hfc:
agent._execute_tool_calls(mock_msg, messages, "task-1")
# Invalid JSON args should fall back to empty dict
args, kwargs = mock_hfc.call_args
assert args[:3] == ("web_search", {}, "task-1")
assert set(kwargs.get("enabled_tools", [])) == agent.valid_tool_names
assert len(messages) == 1
assert messages[0]["role"] == "tool"
assert messages[0]["tool_call_id"] == "c1"
def test_result_truncation_over_100k(self, agent):
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
messages = []
big_result = "x" * 150_000
with patch("run_agent.handle_function_call", return_value=big_result):
agent._execute_tool_calls(mock_msg, messages, "task-1")
# Content should be truncated
assert len(messages[0]["content"]) < 150_000
assert "Truncated" in messages[0]["content"]
class TestConcurrentToolExecution:
"""Tests for _execute_tool_calls_concurrent and dispatch logic."""
def test_single_tool_uses_sequential_path(self, agent):
"""Single tool call should use sequential path, not concurrent."""
tc = _mock_tool_call(name="web_search", arguments='{"q":"test"}', call_id="c1")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_clarify_forces_sequential(self, agent):
"""Batch containing clarify should use sequential path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="clarify", arguments='{"question":"ok?"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_multiple_tools_uses_concurrent_path(self, agent):
"""Multiple read-only tools should use concurrent path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_con.assert_called_once()
mock_seq.assert_not_called()
def test_terminal_batch_forces_sequential(self, agent):
"""Stateful tools should not share the concurrent execution path."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="terminal", arguments='{"command":"pwd"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_write_batch_forces_sequential(self, agent):
"""File mutations should stay ordered within a turn."""
tc1 = _mock_tool_call(name="read_file", arguments='{"path":"x.py"}', call_id="c1")
tc2 = _mock_tool_call(name="write_file", arguments='{"path":"x.py","content":"print(1)"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_disjoint_write_batch_uses_concurrent_path(self, agent):
"""Independent file writes should still run concurrently."""
tc1 = _mock_tool_call(
name="write_file",
arguments='{"path":"src/a.py","content":"print(1)"}',
call_id="c1",
)
tc2 = _mock_tool_call(
name="write_file",
arguments='{"path":"src/b.py","content":"print(2)"}',
call_id="c2",
)
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_con.assert_called_once()
mock_seq.assert_not_called()
def test_overlapping_write_batch_forces_sequential(self, agent):
"""Writes to the same file must stay ordered."""
tc1 = _mock_tool_call(
name="write_file",
arguments='{"path":"src/a.py","content":"print(1)"}',
call_id="c1",
)
tc2 = _mock_tool_call(
name="patch",
arguments='{"path":"src/a.py","old_string":"1","new_string":"2"}',
call_id="c2",
)
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_malformed_json_args_forces_sequential(self, agent):
"""Unparseable tool arguments should fall back to sequential."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments="NOT JSON {{{", call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_non_dict_args_forces_sequential(self, agent):
"""Tool arguments that parse to a non-dict type should fall back to sequential."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='"just a string"', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch.object(agent, "_execute_tool_calls_sequential") as mock_seq:
with patch.object(agent, "_execute_tool_calls_concurrent") as mock_con:
agent._execute_tool_calls(mock_msg, messages, "task-1")
mock_seq.assert_called_once()
mock_con.assert_not_called()
def test_concurrent_executes_all_tools(self, agent):
"""Concurrent path should execute all tools and append results in order."""
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"alpha"}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{"q":"beta"}', call_id="c2")
tc3 = _mock_tool_call(name="web_search", arguments='{"q":"gamma"}', call_id="c3")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2, tc3])
messages = []
call_log = []
def fake_handle(name, args, task_id, **kwargs):
call_log.append(name)
return json.dumps({"result": args.get("q", "")})
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 3
# Results must be in original order
assert messages[0]["tool_call_id"] == "c1"
assert messages[1]["tool_call_id"] == "c2"
assert messages[2]["tool_call_id"] == "c3"
# All should be tool messages
assert all(m["role"] == "tool" for m in messages)
# Content should contain the query results
assert "alpha" in messages[0]["content"]
assert "beta" in messages[1]["content"]
assert "gamma" in messages[2]["content"]
def test_concurrent_preserves_order_despite_timing(self, agent):
"""Even if tools finish in different order, messages should be in original order."""
import time as _time
tc1 = _mock_tool_call(name="web_search", arguments='{"q":"slow"}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{"q":"fast"}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
def fake_handle(name, args, task_id, **kwargs):
q = args.get("q", "")
if q == "slow":
_time.sleep(0.1) # Slow tool
return f"result_{q}"
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert messages[0]["tool_call_id"] == "c1"
assert "result_slow" in messages[0]["content"]
assert messages[1]["tool_call_id"] == "c2"
assert "result_fast" in messages[1]["content"]
def test_concurrent_handles_tool_error(self, agent):
"""If one tool raises, others should still complete."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
call_count = [0]
def fake_handle(name, args, task_id, **kwargs):
call_count[0] += 1
if call_count[0] == 1:
raise RuntimeError("boom")
return "success"
with patch("run_agent.handle_function_call", side_effect=fake_handle):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
# First tool should have error
assert "Error" in messages[0]["content"] or "boom" in messages[0]["content"]
# Second tool should succeed
assert "success" in messages[1]["content"]
def test_concurrent_interrupt_before_start(self, agent):
"""If interrupt is requested before concurrent execution, all tools are skipped."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="read_file", arguments='{}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
with patch("run_agent._set_interrupt"):
agent.interrupt()
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
assert "cancelled" in messages[0]["content"].lower() or "skipped" in messages[0]["content"].lower()
assert "cancelled" in messages[1]["content"].lower() or "skipped" in messages[1]["content"].lower()
def test_concurrent_truncates_large_results(self, agent):
"""Concurrent path should truncate results over 100k chars."""
tc1 = _mock_tool_call(name="web_search", arguments='{}', call_id="c1")
tc2 = _mock_tool_call(name="web_search", arguments='{}', call_id="c2")
mock_msg = _mock_assistant_msg(content="", tool_calls=[tc1, tc2])
messages = []
big_result = "x" * 150_000
with patch("run_agent.handle_function_call", return_value=big_result):
agent._execute_tool_calls_concurrent(mock_msg, messages, "task-1")
assert len(messages) == 2
for m in messages:
assert len(m["content"]) < 150_000
assert "Truncated" in m["content"]
def test_invoke_tool_dispatches_to_handle_function_call(self, agent):
"""_invoke_tool should route regular tools through handle_function_call."""
with patch("run_agent.handle_function_call", return_value="result") as mock_hfc:
result = agent._invoke_tool("web_search", {"q": "test"}, "task-1")
mock_hfc.assert_called_once_with(
"web_search", {"q": "test"}, "task-1",
enabled_tools=list(agent.valid_tool_names),
honcho_manager=None,
honcho_session_key=None,
)
assert result == "result"
def test_invoke_tool_handles_agent_level_tools(self, agent):
"""_invoke_tool should handle todo tool directly."""
with patch("tools.todo_tool.todo_tool", return_value='{"ok":true}') as mock_todo:
result = agent._invoke_tool("todo", {"todos": []}, "task-1")
mock_todo.assert_called_once()
assert "ok" in result
class TestPathsOverlap:
"""Unit tests for the _paths_overlap helper."""
def test_same_path_overlaps(self):
from run_agent import _paths_overlap
assert _paths_overlap(Path("src/a.py"), Path("src/a.py"))
def test_siblings_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path("src/a.py"), Path("src/b.py"))
def test_parent_child_overlap(self):
from run_agent import _paths_overlap
assert _paths_overlap(Path("src"), Path("src/sub/a.py"))
def test_different_roots_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path("src/a.py"), Path("other/a.py"))
def test_nested_vs_flat_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path("src/sub/a.py"), Path("src/a.py"))
def test_empty_paths_do_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path(""), Path(""))
def test_one_empty_path_does_not_overlap(self):
from run_agent import _paths_overlap
assert not _paths_overlap(Path(""), Path("src/a.py"))
assert not _paths_overlap(Path("src/a.py"), Path(""))
class TestHandleMaxIterations:
def test_returns_summary(self, agent):
resp = _mock_response(content="Here is a summary of what I did.")
agent.client.chat.completions.create.return_value = resp
agent._cached_system_prompt = "You are helpful."
messages = [{"role": "user", "content": "do stuff"}]
result = agent._handle_max_iterations(messages, 60)
assert isinstance(result, str)
assert len(result) > 0
assert "summary" in result.lower()
def test_api_failure_returns_error(self, agent):
agent.client.chat.completions.create.side_effect = Exception("API down")
agent._cached_system_prompt = "You are helpful."
messages = [{"role": "user", "content": "do stuff"}]
result = agent._handle_max_iterations(messages, 60)
assert isinstance(result, str)
assert "error" in result.lower()
assert "API down" in result
def test_summary_skips_reasoning_for_unsupported_openrouter_model(self, agent):
agent.model = "minimax/minimax-m2.5"
resp = _mock_response(content="Summary")
agent.client.chat.completions.create.return_value = resp
agent._cached_system_prompt = "You are helpful."
messages = [{"role": "user", "content": "do stuff"}]
result = agent._handle_max_iterations(messages, 60)
assert result == "Summary"
kwargs = agent.client.chat.completions.create.call_args.kwargs
assert "reasoning" not in kwargs.get("extra_body", {})
class TestRunConversation:
"""Tests for the main run_conversation method.
Each test mocks client.chat.completions.create to return controlled
responses, exercising different code paths without real API calls.
"""
def _setup_agent(self, agent):
"""Common setup for run_conversation tests."""
agent._cached_system_prompt = "You are helpful."
agent._use_prompt_caching = False
agent.tool_delay = 0
agent.compression_enabled = False
agent.save_trajectories = False
def test_stop_finish_reason_returns_response(self, agent):
self._setup_agent(agent)
resp = _mock_response(content="Final answer", finish_reason="stop")
agent.client.chat.completions.create.return_value = resp
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello")
assert result["final_response"] == "Final answer"
assert result["completed"] is True
def test_tool_calls_then_stop(self, agent):
self._setup_agent(agent)
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
resp1 = _mock_response(content="", finish_reason="tool_calls", tool_calls=[tc])
resp2 = _mock_response(content="Done searching", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [resp1, resp2]
with (
patch("run_agent.handle_function_call", return_value="search result"),
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("search something")
assert result["final_response"] == "Done searching"
assert result["api_calls"] == 2
def test_interrupt_breaks_loop(self, agent):
self._setup_agent(agent)
def interrupt_side_effect(api_kwargs):
agent._interrupt_requested = True
raise InterruptedError("Agent interrupted during API call")
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
patch("run_agent._set_interrupt"),
patch.object(
agent, "_interruptible_api_call", side_effect=interrupt_side_effect
),
):
result = agent.run_conversation("hello")
assert result["interrupted"] is True
def test_invalid_tool_name_retry(self, agent):
"""Model hallucinates an invalid tool name, agent retries and succeeds."""
self._setup_agent(agent)
bad_tc = _mock_tool_call(name="nonexistent_tool", arguments="{}", call_id="c1")
resp_bad = _mock_response(
content="", finish_reason="tool_calls", tool_calls=[bad_tc]
)
resp_good = _mock_response(content="Got it", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [resp_bad, resp_good]
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("do something")
assert result["final_response"] == "Got it"
assert result["completed"] is True
assert result["api_calls"] == 2
def test_empty_content_retry_uses_inline_reasoning_as_response(self, agent):
"""Reasoning-only payloads should recover the inline reasoning text."""
self._setup_agent(agent)
empty_resp = _mock_response(
content="<think>internal reasoning</think>",
finish_reason="stop",
)
# Return empty 3 times to exhaust retries
agent.client.chat.completions.create.side_effect = [
empty_resp,
empty_resp,
empty_resp,
]
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("answer me")
assert result["completed"] is True
assert result["final_response"] == "internal reasoning"
def test_nous_401_refreshes_after_remint_and_retries(self, agent):
self._setup_agent(agent)
agent.provider = "nous"
agent.api_mode = "chat_completions"
calls = {"api": 0, "refresh": 0}
class _UnauthorizedError(RuntimeError):
def __init__(self):
super().__init__("Error code: 401 - unauthorized")
self.status_code = 401
def _fake_api_call(api_kwargs):
calls["api"] += 1
if calls["api"] == 1:
raise _UnauthorizedError()
return _mock_response(
content="Recovered after remint", finish_reason="stop"
)
def _fake_refresh(*, force=True):
calls["refresh"] += 1
assert force is True
return True
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
patch.object(
agent, "_try_refresh_nous_client_credentials", side_effect=_fake_refresh
),
):
result = agent.run_conversation("hello")
assert calls["api"] == 2
assert calls["refresh"] == 1
assert result["completed"] is True
assert result["final_response"] == "Recovered after remint"
def test_context_compression_triggered(self, agent):
"""When compressor says should_compress, compression runs."""
self._setup_agent(agent)
agent.compression_enabled = True
tc = _mock_tool_call(name="web_search", arguments="{}", call_id="c1")
resp1 = _mock_response(content="", finish_reason="tool_calls", tool_calls=[tc])
resp2 = _mock_response(content="All done", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [resp1, resp2]
with (
patch("run_agent.handle_function_call", return_value="result"),
patch.object(
agent.context_compressor, "should_compress", return_value=True
),
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
# _compress_context should return (messages, system_prompt)
mock_compress.return_value = (
[{"role": "user", "content": "search something"}],
"compressed system prompt",
)
result = agent.run_conversation("search something")
mock_compress.assert_called_once()
assert result["final_response"] == "All done"
assert result["completed"] is True
def test_glm_prompt_exceeds_max_length_triggers_compression(self, agent):
"""GLM/Z.AI uses 'Prompt exceeds max length' for context overflow."""
self._setup_agent(agent)
err_400 = Exception(
"Error code: 400 - {'error': {'code': '1261', 'message': 'Prompt exceeds max length'}}"
)
err_400.status_code = 400
ok_resp = _mock_response(content="Recovered after compression", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [err_400, ok_resp]
prefill = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
with (
patch.object(agent, "_compress_context") as mock_compress,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
mock_compress.return_value = (
[{"role": "user", "content": "hello"}],
"compressed system prompt",
)
result = agent.run_conversation("hello", conversation_history=prefill)
mock_compress.assert_called_once()
assert result["final_response"] == "Recovered after compression"
assert result["completed"] is True
def test_length_finish_reason_requests_continuation(self, agent):
"""Normal truncation (partial real content) triggers continuation."""
self._setup_agent(agent)
first = _mock_response(content="Part 1 ", finish_reason="length")
second = _mock_response(content="Part 2", finish_reason="stop")
agent.client.chat.completions.create.side_effect = [first, second]
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello")
assert result["completed"] is True
assert result["api_calls"] == 2
assert result["final_response"] == "Part 1 Part 2"
second_call_messages = agent.client.chat.completions.create.call_args_list[1].kwargs["messages"]
assert second_call_messages[-1]["role"] == "user"
assert "truncated by the output length limit" in second_call_messages[-1]["content"]
def test_length_thinking_exhausted_skips_continuation(self, agent):
"""When finish_reason='length' but content is only thinking, skip retries."""
self._setup_agent(agent)
resp = _mock_response(
content="<think>internal reasoning</think>",
finish_reason="length",
)
agent.client.chat.completions.create.return_value = resp
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello")
# Should return immediately — no continuation, only 1 API call
assert result["completed"] is False
assert result["api_calls"] == 1
assert "reasoning" in result["error"].lower()
assert "output tokens" in result["error"].lower()
# Should have a user-friendly response (not None)
assert result["final_response"] is not None
assert "Thinking Budget Exhausted" in result["final_response"]
assert "/thinkon" in result["final_response"]
def test_length_empty_content_detected_as_thinking_exhausted(self, agent):
"""When finish_reason='length' and content is None/empty, detect exhaustion."""
self._setup_agent(agent)
resp = _mock_response(content=None, finish_reason="length")
agent.client.chat.completions.create.return_value = resp
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation("hello")
assert result["completed"] is False
assert result["api_calls"] == 1
assert "reasoning" in result["error"].lower()
# User-friendly message is returned
assert result["final_response"] is not None
assert "Thinking Budget Exhausted" in result["final_response"]
class TestRetryExhaustion:
"""Regression: retry_count > max_retries was dead code (off-by-one).
When retries were exhausted the condition never triggered, causing
the loop to exit and fall through to response.choices[0] on an
invalid response, raising IndexError.
"""
def _setup_agent(self, agent):
agent._cached_system_prompt = "You are helpful."
agent._use_prompt_caching = False
agent.tool_delay = 0
agent.compression_enabled = False
agent.save_trajectories = False
@staticmethod
def _make_fast_time_mock():
"""Return a mock time module where sleep loops exit instantly."""
mock_time = MagicMock()
_t = [1000.0]
def _advancing_time():
_t[0] += 500.0 # jump 500s per call so sleep_end is always in the past
return _t[0]
mock_time.time.side_effect = _advancing_time
mock_time.sleep = MagicMock() # no-op
mock_time.monotonic.return_value = 12345.0
return mock_time
def test_invalid_response_returns_error_not_crash(self, agent):
"""Exhausted retries on invalid (empty choices) response must not IndexError."""
self._setup_agent(agent)
# Return response with empty choices every time
bad_resp = SimpleNamespace(
choices=[],
model="test/model",
usage=None,
)
agent.client.chat.completions.create.return_value = bad_resp
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
patch("run_agent.time", self._make_fast_time_mock()),
):
result = agent.run_conversation("hello")
assert result.get("completed") is False, (
f"Expected completed=False, got: {result}"
)
assert result.get("failed") is True
assert "error" in result
assert "Invalid API response" in result["error"]
def test_api_error_returns_gracefully_after_retries(self, agent):
"""Exhausted retries on API errors must return error result, not crash."""
self._setup_agent(agent)
agent.client.chat.completions.create.side_effect = RuntimeError("rate limited")
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
patch("run_agent.time", self._make_fast_time_mock()),
):
result = agent.run_conversation("hello")
assert result.get("completed") is False
assert result.get("failed") is True
assert "error" in result
assert "rate limited" in result["error"]
# ---------------------------------------------------------------------------
# Flush sentinel leak
# ---------------------------------------------------------------------------
class TestFlushSentinelNotLeaked:
"""_flush_sentinel must be stripped before sending messages to the API."""
def test_flush_sentinel_stripped_from_api_messages(self, agent_with_memory_tool):
"""Verify _flush_sentinel is not sent to the API provider."""
agent = agent_with_memory_tool
agent._memory_store = MagicMock()
agent._memory_flush_min_turns = 1
agent._user_turn_count = 10
agent._cached_system_prompt = "system"
messages = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
{"role": "user", "content": "remember this"},
]
# Mock the API to return a simple response (no tool calls)
mock_msg = SimpleNamespace(content="OK", tool_calls=None)
mock_choice = SimpleNamespace(message=mock_msg)
mock_response = SimpleNamespace(choices=[mock_choice])
agent.client.chat.completions.create.return_value = mock_response
# Bypass auxiliary client so flush uses agent.client directly
with patch("agent.auxiliary_client.call_llm", side_effect=RuntimeError("no provider")):
agent.flush_memories(messages, min_turns=0)
# Check what was actually sent to the API
call_args = agent.client.chat.completions.create.call_args
assert call_args is not None, "flush_memories never called the API"
api_messages = call_args.kwargs.get("messages") or call_args[1].get("messages")
for msg in api_messages:
assert "_flush_sentinel" not in msg, (
f"_flush_sentinel leaked to API in message: {msg}"
)
# ---------------------------------------------------------------------------
# Conversation history mutation
# ---------------------------------------------------------------------------
class TestConversationHistoryNotMutated:
"""run_conversation must not mutate the caller's conversation_history list."""
def test_caller_list_unchanged_after_run(self, agent):
"""Passing conversation_history should not modify the original list."""
history = [
{"role": "user", "content": "previous question"},
{"role": "assistant", "content": "previous answer"},
]
original_len = len(history)
resp = _mock_response(content="new answer", finish_reason="stop")
agent.client.chat.completions.create.return_value = resp
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
result = agent.run_conversation(
"new question", conversation_history=history
)
# Caller's list must be untouched
assert len(history) == original_len, (
f"conversation_history was mutated: expected {original_len} items, got {len(history)}"
)
# Result should have more messages than the original history
assert len(result["messages"]) > original_len
# ---------------------------------------------------------------------------
# _max_tokens_param consistency
# ---------------------------------------------------------------------------
class TestNousCredentialRefresh:
"""Verify Nous credential refresh rebuilds the runtime client."""
def test_try_refresh_nous_client_credentials_rebuilds_client(
self, agent, monkeypatch
):
agent.provider = "nous"
agent.api_mode = "chat_completions"
closed = {"value": False}
rebuilt = {"kwargs": None}
captured = {}
class _ExistingClient:
def close(self):
closed["value"] = True
class _RebuiltClient:
pass
def _fake_resolve(**kwargs):
captured.update(kwargs)
return {
"api_key": "new-nous-key",
"base_url": "https://inference-api.nousresearch.com/v1",
}
def _fake_openai(**kwargs):
rebuilt["kwargs"] = kwargs
return _RebuiltClient()
monkeypatch.setattr(
"hermes_cli.auth.resolve_nous_runtime_credentials", _fake_resolve
)
agent.client = _ExistingClient()
with patch("run_agent.OpenAI", side_effect=_fake_openai):
ok = agent._try_refresh_nous_client_credentials(force=True)
assert ok is True
assert closed["value"] is True
assert captured["force_mint"] is True
assert rebuilt["kwargs"]["api_key"] == "new-nous-key"
assert (
rebuilt["kwargs"]["base_url"] == "https://inference-api.nousresearch.com/v1"
)
assert "default_headers" not in rebuilt["kwargs"]
assert isinstance(agent.client, _RebuiltClient)
class TestMaxTokensParam:
"""Verify _max_tokens_param returns the correct key for each provider."""
def test_returns_max_completion_tokens_for_direct_openai(self, agent):
agent.base_url = "https://api.openai.com/v1"
result = agent._max_tokens_param(4096)
assert result == {"max_completion_tokens": 4096}
def test_returns_max_tokens_for_openrouter(self, agent):
agent.base_url = "https://openrouter.ai/api/v1"
result = agent._max_tokens_param(4096)
assert result == {"max_tokens": 4096}
def test_returns_max_tokens_for_local(self, agent):
agent.base_url = "http://localhost:11434/v1"
result = agent._max_tokens_param(4096)
assert result == {"max_tokens": 4096}
def test_not_tricked_by_openai_in_openrouter_url(self, agent):
agent.base_url = "https://openrouter.ai/api/v1/api.openai.com"
result = agent._max_tokens_param(4096)
assert result == {"max_tokens": 4096}
# ---------------------------------------------------------------------------
# System prompt stability for prompt caching
# ---------------------------------------------------------------------------
class TestSystemPromptStability:
"""Verify that the system prompt stays stable across turns for cache hits."""
def test_stored_prompt_reused_for_continuing_session(self, agent):
"""When conversation_history is non-empty and session DB has a stored
prompt, it should be reused instead of rebuilding from disk."""
stored = "You are helpful. [stored from turn 1]"
mock_db = MagicMock()
mock_db.get_session.return_value = {"system_prompt": stored}
agent._session_db = mock_db
# Simulate a continuing session with history
history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi"},
]
# First call — _cached_system_prompt is None, history is non-empty
agent._cached_system_prompt = None
# Patch run_conversation internals to just test the system prompt logic.
# We'll call the prompt caching block directly by simulating what
# run_conversation does.
conversation_history = history
# The block under test (from run_conversation):
if agent._cached_system_prompt is None:
stored_prompt = None
if conversation_history and agent._session_db:
try:
session_row = agent._session_db.get_session(agent.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
except Exception:
pass
if stored_prompt:
agent._cached_system_prompt = stored_prompt
assert agent._cached_system_prompt == stored
mock_db.get_session.assert_called_once_with(agent.session_id)
def test_fresh_build_when_no_history(self, agent):
"""On the first turn (no history), system prompt should be built fresh."""
mock_db = MagicMock()
agent._session_db = mock_db
agent._cached_system_prompt = None
conversation_history = []
# The block under test:
if agent._cached_system_prompt is None:
stored_prompt = None
if conversation_history and agent._session_db:
session_row = agent._session_db.get_session(agent.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
if stored_prompt:
agent._cached_system_prompt = stored_prompt
else:
agent._cached_system_prompt = agent._build_system_prompt()
# Should have built fresh, not queried the DB
mock_db.get_session.assert_not_called()
assert agent._cached_system_prompt is not None
assert "Hermes Agent" in agent._cached_system_prompt
def test_fresh_build_when_db_has_no_prompt(self, agent):
"""If the session DB has no stored prompt, build fresh even with history."""
mock_db = MagicMock()
mock_db.get_session.return_value = {"system_prompt": ""}
agent._session_db = mock_db
agent._cached_system_prompt = None
conversation_history = [{"role": "user", "content": "hi"}]
if agent._cached_system_prompt is None:
stored_prompt = None
if conversation_history and agent._session_db:
try:
session_row = agent._session_db.get_session(agent.session_id)
if session_row:
stored_prompt = session_row.get("system_prompt") or None
except Exception:
pass
if stored_prompt:
agent._cached_system_prompt = stored_prompt
else:
agent._cached_system_prompt = agent._build_system_prompt()
# Empty string is falsy, so should fall through to fresh build
assert "Hermes Agent" in agent._cached_system_prompt
def test_honcho_context_baked_into_prompt_on_first_turn(self, agent):
"""Honcho context should be baked into _cached_system_prompt on
the first turn, not injected separately per API call."""
agent._honcho_context = "User prefers Python over JavaScript."
agent._cached_system_prompt = None
# Simulate first turn: build fresh and bake in Honcho
agent._cached_system_prompt = agent._build_system_prompt()
if agent._honcho_context:
agent._cached_system_prompt = (
agent._cached_system_prompt + "\n\n" + agent._honcho_context
).strip()
assert "User prefers Python over JavaScript" in agent._cached_system_prompt
def test_honcho_prefetch_runs_on_continuing_session(self):
"""Honcho prefetch is consumed on continuing sessions via ephemeral context."""
conversation_history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
recall_mode = "hybrid"
should_prefetch = bool(conversation_history) and recall_mode != "tools"
assert should_prefetch is True
def test_inject_honcho_turn_context_appends_system_note(self):
content = _inject_honcho_turn_context("hello", "## Honcho Memory\nprior context")
assert "hello" in content
assert "Honcho memory was retrieved from prior sessions" in content
assert "## Honcho Memory" in content
def test_honcho_continuing_session_keeps_turn_context_out_of_system_prompt(self, agent):
captured = {}
def _fake_api_call(api_kwargs):
captured.update(api_kwargs)
return _mock_response(content="done", finish_reason="stop")
agent._honcho = object()
agent._honcho_session_key = "session-1"
agent._honcho_config = SimpleNamespace(
ai_peer="hermes",
memory_mode="hybrid",
write_frequency="async",
recall_mode="hybrid",
)
agent._use_prompt_caching = False
conversation_history = [
{"role": "user", "content": "hello"},
{"role": "assistant", "content": "hi there"},
]
with (
patch.object(agent, "_honcho_prefetch", return_value="## Honcho Memory\nprior context"),
patch.object(agent, "_queue_honcho_prefetch"),
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
):
result = agent.run_conversation("what were we doing?", conversation_history=conversation_history)
assert result["completed"] is True
api_messages = captured["messages"]
assert api_messages[0]["role"] == "system"
assert "prior context" not in api_messages[0]["content"]
current_user = api_messages[-1]
assert current_user["role"] == "user"
assert "what were we doing?" in current_user["content"]
assert "prior context" in current_user["content"]
assert "Honcho memory was retrieved from prior sessions" in current_user["content"]
def test_honcho_prefetch_runs_on_first_turn(self):
"""Honcho prefetch should run when conversation_history is empty."""
conversation_history = []
should_prefetch = not conversation_history
assert should_prefetch is True
def test_run_conversation_can_skip_honcho_sync_for_synthetic_turns(self, agent):
captured = {}
def _fake_api_call(api_kwargs):
captured.update(api_kwargs)
return _mock_response(content="done", finish_reason="stop")
agent._honcho = MagicMock()
agent._honcho_session_key = "session-1"
agent._honcho_config = SimpleNamespace(
ai_peer="hermes",
memory_mode="hybrid",
write_frequency="async",
recall_mode="hybrid",
)
agent._use_prompt_caching = False
with (
patch.object(agent, "_honcho_sync") as mock_sync,
patch.object(agent, "_queue_honcho_prefetch") as mock_prefetch,
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
patch.object(agent, "_interruptible_api_call", side_effect=_fake_api_call),
):
result = agent.run_conversation("synthetic flush turn", sync_honcho=False)
assert result["completed"] is True
assert captured["messages"][-1]["content"] == "synthetic flush turn"
mock_sync.assert_not_called()
mock_prefetch.assert_not_called()
class TestHonchoActivation:
def test_disabled_config_skips_honcho_init(self):
hcfg = HonchoClientConfig(
enabled=False,
api_key="honcho-key",
peer_name="user",
ai_peer="hermes",
)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client") as mock_client,
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
)
assert agent._honcho is None
assert agent._honcho_config is hcfg
mock_client.assert_not_called()
def test_injected_honcho_manager_skips_fresh_client_init(self):
hcfg = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
memory_mode="hybrid",
peer_name="user",
ai_peer="hermes",
recall_mode="hybrid",
)
manager = MagicMock()
manager._config = hcfg
manager.get_or_create.return_value = SimpleNamespace(messages=[])
manager.get_prefetch_context.return_value = {"representation": "Known user", "card": ""}
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("honcho_integration.client.get_honcho_client") as mock_client,
patch("tools.honcho_tools.set_session_context"),
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
honcho_session_key="gateway-session",
honcho_manager=manager,
honcho_config=hcfg,
)
assert agent._honcho is manager
manager.get_or_create.assert_called_once_with("gateway-session")
manager.get_prefetch_context.assert_called_once_with("gateway-session")
manager.set_context_result.assert_called_once_with(
"gateway-session",
{"representation": "Known user", "card": ""},
)
mock_client.assert_not_called()
def test_recall_mode_context_suppresses_honcho_tools(self):
hcfg = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
memory_mode="hybrid",
peer_name="user",
ai_peer="hermes",
recall_mode="context",
)
manager = MagicMock()
manager._config = hcfg
manager.get_or_create.return_value = SimpleNamespace(messages=[])
manager.get_prefetch_context.return_value = {"representation": "Known user", "card": ""}
with (
patch(
"run_agent.get_tool_definitions",
side_effect=[
_make_tool_defs("web_search"),
_make_tool_defs(
"web_search",
"honcho_context",
"honcho_profile",
"honcho_search",
"honcho_conclude",
),
],
),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("tools.honcho_tools.set_session_context"),
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
honcho_session_key="gateway-session",
honcho_manager=manager,
honcho_config=hcfg,
)
assert "web_search" in agent.valid_tool_names
assert "honcho_context" not in agent.valid_tool_names
assert "honcho_profile" not in agent.valid_tool_names
assert "honcho_search" not in agent.valid_tool_names
assert "honcho_conclude" not in agent.valid_tool_names
def test_inactive_honcho_strips_stale_honcho_tools(self):
hcfg = HonchoClientConfig(
enabled=False,
api_key="honcho-key",
peer_name="user",
ai_peer="hermes",
)
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search", "honcho_context")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client") as mock_client,
):
agent = AIAgent(
api_key="test-key-1234567890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
)
assert agent._honcho is None
assert "web_search" in agent.valid_tool_names
assert "honcho_context" not in agent.valid_tool_names
mock_client.assert_not_called()
class TestHonchoPrefetchScheduling:
def test_honcho_prefetch_includes_cached_dialectic(self, agent):
agent._honcho = MagicMock()
agent._honcho_session_key = "session-key"
agent._honcho.pop_context_result.return_value = {}
agent._honcho.pop_dialectic_result.return_value = "Continue with the migration checklist."
context = agent._honcho_prefetch("what next?")
assert "Continuity synthesis" in context
assert "migration checklist" in context
def test_queue_honcho_prefetch_skips_tools_mode(self, agent):
agent._honcho = MagicMock()
agent._honcho_session_key = "session-key"
agent._honcho_config = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
recall_mode="tools",
)
agent._queue_honcho_prefetch("what next?")
agent._honcho.prefetch_context.assert_not_called()
agent._honcho.prefetch_dialectic.assert_not_called()
def test_queue_honcho_prefetch_runs_when_context_enabled(self, agent):
agent._honcho = MagicMock()
agent._honcho_session_key = "session-key"
agent._honcho_config = HonchoClientConfig(
enabled=True,
api_key="honcho-key",
recall_mode="hybrid",
)
agent._queue_honcho_prefetch("what next?")
agent._honcho.prefetch_context.assert_called_once_with("session-key", "what next?")
agent._honcho.prefetch_dialectic.assert_called_once_with("session-key", "what next?")
# ---------------------------------------------------------------------------
# Iteration budget pressure warnings
# ---------------------------------------------------------------------------
class TestBudgetPressure:
"""Budget pressure warning system (issue #414)."""
def test_no_warning_below_caution(self, agent):
agent.max_iterations = 60
assert agent._get_budget_warning(30) is None
def test_caution_at_70_percent(self, agent):
agent.max_iterations = 60
msg = agent._get_budget_warning(42)
assert msg is not None
assert "[BUDGET:" in msg
assert "18 iterations left" in msg
def test_warning_at_90_percent(self, agent):
agent.max_iterations = 60
msg = agent._get_budget_warning(54)
assert "[BUDGET WARNING:" in msg
assert "Provide your final response NOW" in msg
def test_last_iteration(self, agent):
agent.max_iterations = 60
msg = agent._get_budget_warning(59)
assert "1 iteration(s) left" in msg
def test_disabled(self, agent):
agent.max_iterations = 60
agent._budget_pressure_enabled = False
assert agent._get_budget_warning(55) is None
def test_zero_max_iterations(self, agent):
agent.max_iterations = 0
assert agent._get_budget_warning(0) is None
def test_injects_into_json_tool_result(self, agent):
"""Warning should be injected as _budget_warning field in JSON tool results."""
import json
agent.max_iterations = 10
messages = [
{"role": "tool", "content": json.dumps({"output": "done", "exit_code": 0}), "tool_call_id": "tc1"}
]
warning = agent._get_budget_warning(9)
assert warning is not None
# Simulate the injection logic
last_content = messages[-1]["content"]
parsed = json.loads(last_content)
parsed["_budget_warning"] = warning
messages[-1]["content"] = json.dumps(parsed, ensure_ascii=False)
result = json.loads(messages[-1]["content"])
assert "_budget_warning" in result
assert "BUDGET WARNING" in result["_budget_warning"]
assert result["output"] == "done" # original content preserved
def test_appends_to_non_json_tool_result(self, agent):
"""Warning should be appended as text for non-JSON tool results."""
agent.max_iterations = 10
messages = [
{"role": "tool", "content": "plain text result", "tool_call_id": "tc1"}
]
warning = agent._get_budget_warning(9)
# Simulate injection logic for non-JSON
last_content = messages[-1]["content"]
try:
import json
json.loads(last_content)
except (json.JSONDecodeError, TypeError):
messages[-1]["content"] = last_content + f"\n\n{warning}"
assert "plain text result" in messages[-1]["content"]
assert "BUDGET WARNING" in messages[-1]["content"]
class TestSafeWriter:
"""Verify _SafeWriter guards stdout against OSError (broken pipes)."""
def test_write_delegates_normally(self):
"""When stdout is healthy, _SafeWriter is transparent."""
from run_agent import _SafeWriter
from io import StringIO
inner = StringIO()
writer = _SafeWriter(inner)
writer.write("hello")
assert inner.getvalue() == "hello"
def test_write_catches_oserror(self):
"""OSError on write is silently caught, returns len(data)."""
from run_agent import _SafeWriter
from unittest.mock import MagicMock
inner = MagicMock()
inner.write.side_effect = OSError(5, "Input/output error")
writer = _SafeWriter(inner)
result = writer.write("hello")
assert result == 5 # len("hello")
def test_flush_catches_oserror(self):
"""OSError on flush is silently caught."""
from run_agent import _SafeWriter
from unittest.mock import MagicMock
inner = MagicMock()
inner.flush.side_effect = OSError(5, "Input/output error")
writer = _SafeWriter(inner)
writer.flush() # should not raise
def test_print_survives_broken_stdout(self, monkeypatch):
"""print() through _SafeWriter doesn't crash on broken pipe."""
import sys
from run_agent import _SafeWriter
from unittest.mock import MagicMock
broken = MagicMock()
broken.write.side_effect = OSError(5, "Input/output error")
original = sys.stdout
sys.stdout = _SafeWriter(broken)
try:
print("this should not crash") # would raise without _SafeWriter
finally:
sys.stdout = original
def test_installed_in_run_conversation(self, agent):
2026-03-14 02:19:46 -07:00
"""run_conversation installs _SafeWriter on stdio."""
import sys
from run_agent import _SafeWriter
resp = _mock_response(content="Done", finish_reason="stop")
agent.client.chat.completions.create.return_value = resp
2026-03-14 02:19:46 -07:00
original_stdout = sys.stdout
original_stderr = sys.stderr
try:
with (
patch.object(agent, "_persist_session"),
patch.object(agent, "_save_trajectory"),
patch.object(agent, "_cleanup_task_resources"),
):
agent.run_conversation("test")
assert isinstance(sys.stdout, _SafeWriter)
2026-03-14 02:19:46 -07:00
assert isinstance(sys.stderr, _SafeWriter)
finally:
sys.stdout = original_stdout
sys.stderr = original_stderr
def test_installed_before_init_time_honcho_error_prints(self):
"""AIAgent.__init__ wraps stdout before Honcho fallback prints can fire."""
import sys
from run_agent import _SafeWriter
broken = MagicMock()
broken.write.side_effect = OSError(5, "Input/output error")
broken.flush.side_effect = OSError(5, "Input/output error")
original = sys.stdout
sys.stdout = broken
try:
hcfg = HonchoClientConfig(enabled=True, api_key="test-honcho-key")
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI"),
patch("hermes_cli.config.load_config", return_value={"memory": {}}),
patch("honcho_integration.client.HonchoClientConfig.from_global_config", return_value=hcfg),
patch("honcho_integration.client.get_honcho_client", side_effect=RuntimeError("boom")),
):
agent = AIAgent(
api_key="test-k...7890",
quiet_mode=True,
skip_context_files=True,
skip_memory=False,
)
assert isinstance(sys.stdout, _SafeWriter)
assert agent._honcho is None
finally:
sys.stdout = original
def test_double_wrap_prevented(self):
"""Wrapping an already-wrapped stream doesn't add layers."""
import sys
from run_agent import _SafeWriter
from io import StringIO
inner = StringIO()
wrapped = _SafeWriter(inner)
# isinstance check should prevent double-wrapping
assert isinstance(wrapped, _SafeWriter)
# The guard in run_conversation checks isinstance before wrapping
if not isinstance(wrapped, _SafeWriter):
wrapped = _SafeWriter(wrapped)
# Still just one layer
wrapped.write("test")
assert inner.getvalue() == "test"
class TestSaveSessionLogAtomicWrite:
def test_uses_shared_atomic_json_helper(self, agent, tmp_path):
agent.session_log_file = tmp_path / "session.json"
messages = [{"role": "user", "content": "hello"}]
with patch("run_agent.atomic_json_write", create=True) as mock_atomic_write:
agent._save_session_log(messages)
mock_atomic_write.assert_called_once()
call_args = mock_atomic_write.call_args
assert call_args.args[0] == agent.session_log_file
payload = call_args.args[1]
assert payload["session_id"] == agent.session_id
assert payload["messages"] == messages
assert call_args.kwargs["indent"] == 2
assert call_args.kwargs["default"] is str
# ===================================================================
# Anthropic adapter integration fixes
# ===================================================================
class TestBuildApiKwargsAnthropicMaxTokens:
"""Bug fix: max_tokens was always None for Anthropic mode, ignoring user config."""
def test_max_tokens_passed_to_anthropic(self, agent):
agent.api_mode = "anthropic_messages"
agent.max_tokens = 4096
agent.reasoning_config = None
with patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build:
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 4096}
agent._build_api_kwargs([{"role": "user", "content": "test"}])
_, kwargs = mock_build.call_args
if not kwargs:
kwargs = dict(zip(
["model", "messages", "tools", "max_tokens", "reasoning_config"],
mock_build.call_args[0],
))
assert kwargs.get("max_tokens") == 4096 or mock_build.call_args[1].get("max_tokens") == 4096
def test_max_tokens_none_when_unset(self, agent):
agent.api_mode = "anthropic_messages"
agent.max_tokens = None
agent.reasoning_config = None
with patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build:
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 16384}
agent._build_api_kwargs([{"role": "user", "content": "test"}])
call_args = mock_build.call_args
# max_tokens should be None (let adapter use its default)
if call_args[1]:
assert call_args[1].get("max_tokens") is None
else:
assert call_args[0][3] is None
class TestAnthropicImageFallback:
def test_build_api_kwargs_converts_multimodal_user_image_to_text(self, agent):
agent.api_mode = "anthropic_messages"
agent.reasoning_config = None
api_messages = [{
"role": "user",
"content": [
{"type": "text", "text": "Can you see this now?"},
{"type": "image_url", "image_url": {"url": "https://example.com/cat.png"}},
],
}]
with (
patch("tools.vision_tools.vision_analyze_tool", new=AsyncMock(return_value=json.dumps({"success": True, "analysis": "A cat sitting on a chair."}))),
patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build,
):
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 4096}
agent._build_api_kwargs(api_messages)
kwargs = mock_build.call_args.kwargs or dict(zip(
["model", "messages", "tools", "max_tokens", "reasoning_config"],
mock_build.call_args.args,
))
transformed = kwargs["messages"]
assert isinstance(transformed[0]["content"], str)
assert "A cat sitting on a chair." in transformed[0]["content"]
assert "Can you see this now?" in transformed[0]["content"]
assert "vision_analyze with image_url: https://example.com/cat.png" in transformed[0]["content"]
def test_build_api_kwargs_reuses_cached_image_analysis_for_duplicate_images(self, agent):
agent.api_mode = "anthropic_messages"
agent.reasoning_config = None
data_url = "data:image/png;base64,QUFBQQ=="
api_messages = [
{
"role": "user",
"content": [
{"type": "text", "text": "first"},
{"type": "input_image", "image_url": data_url},
],
},
{
"role": "user",
"content": [
{"type": "text", "text": "second"},
{"type": "input_image", "image_url": data_url},
],
},
]
mock_vision = AsyncMock(return_value=json.dumps({"success": True, "analysis": "A small test image."}))
with (
patch("tools.vision_tools.vision_analyze_tool", new=mock_vision),
patch("agent.anthropic_adapter.build_anthropic_kwargs") as mock_build,
):
mock_build.return_value = {"model": "claude-sonnet-4-20250514", "messages": [], "max_tokens": 4096}
agent._build_api_kwargs(api_messages)
assert mock_vision.await_count == 1
class TestFallbackAnthropicProvider:
"""Bug fix: _try_activate_fallback had no case for anthropic provider."""
def test_fallback_to_anthropic_sets_api_mode(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-20250514"}
agent._fallback_chain = [agent._fallback_model]
agent._fallback_index = 0
mock_client = MagicMock()
mock_client.base_url = "https://api.anthropic.com/v1"
mock_client.api_key = "sk-ant-api03-test"
with (
patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_client, None)),
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value=None),
):
mock_build.return_value = MagicMock()
result = agent._try_activate_fallback()
assert result is True
assert agent.api_mode == "anthropic_messages"
assert agent._anthropic_client is not None
assert agent.client is None
def test_fallback_to_anthropic_enables_prompt_caching(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-20250514"}
agent._fallback_chain = [agent._fallback_model]
agent._fallback_index = 0
mock_client = MagicMock()
mock_client.base_url = "https://api.anthropic.com/v1"
mock_client.api_key = "sk-ant-api03-test"
with (
patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_client, None)),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value=None),
):
agent._try_activate_fallback()
assert agent._use_prompt_caching is True
def test_fallback_to_openrouter_uses_openai_client(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "openrouter", "model": "anthropic/claude-sonnet-4"}
agent._fallback_chain = [agent._fallback_model]
agent._fallback_index = 0
mock_client = MagicMock()
mock_client.base_url = "https://openrouter.ai/api/v1"
mock_client.api_key = "sk-or-test"
with patch("agent.auxiliary_client.resolve_provider_client", return_value=(mock_client, None)):
result = agent._try_activate_fallback()
assert result is True
assert agent.api_mode == "chat_completions"
assert agent.client is mock_client
def test_aiagent_uses_copilot_acp_client():
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("run_agent.OpenAI") as mock_openai,
patch("agent.copilot_acp_client.CopilotACPClient") as mock_acp_client,
):
acp_client = MagicMock()
mock_acp_client.return_value = acp_client
agent = AIAgent(
api_key="copilot-acp",
base_url="acp://copilot",
provider="copilot-acp",
acp_command="/usr/local/bin/copilot",
acp_args=["--acp", "--stdio"],
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
assert agent.client is acp_client
mock_openai.assert_not_called()
mock_acp_client.assert_called_once()
assert mock_acp_client.call_args.kwargs["base_url"] == "acp://copilot"
assert mock_acp_client.call_args.kwargs["api_key"] == "copilot-acp"
assert mock_acp_client.call_args.kwargs["command"] == "/usr/local/bin/copilot"
assert mock_acp_client.call_args.kwargs["args"] == ["--acp", "--stdio"]
def test_is_openai_client_closed_honors_custom_client_flag():
assert AIAgent._is_openai_client_closed(SimpleNamespace(is_closed=True)) is True
assert AIAgent._is_openai_client_closed(SimpleNamespace(is_closed=False)) is False
class TestAnthropicBaseUrlPassthrough:
"""Bug fix: base_url was filtered with 'anthropic in base_url', blocking proxies."""
def test_custom_proxy_base_url_passed_through(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
):
mock_build.return_value = MagicMock()
a = AIAgent(
api_key="sk-ant-api03-test1234567890",
base_url="https://llm-proxy.company.com/v1",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
call_args = mock_build.call_args
# base_url should be passed through, not filtered out
assert call_args[0][1] == "https://llm-proxy.company.com/v1"
def test_none_base_url_passed_as_none(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
):
mock_build.return_value = MagicMock()
a = AIAgent(
api_key="sk-ant-api03-test1234567890",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
call_args = mock_build.call_args
# No base_url provided, should be default empty string or None
passed_url = call_args[0][1]
assert not passed_url or passed_url is None
class TestAnthropicCredentialRefresh:
def test_try_refresh_anthropic_client_credentials_rebuilds_client(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client") as mock_build,
):
old_client = MagicMock()
new_client = MagicMock()
mock_build.side_effect = [old_client, new_client]
agent = AIAgent(
api_key="sk-ant-oat01-stale-token",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
agent._anthropic_client = old_client
agent._anthropic_api_key = "sk-ant-oat01-stale-token"
agent._anthropic_base_url = "https://api.anthropic.com"
agent.provider = "anthropic"
with (
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="sk-ant-oat01-fresh-token"),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=new_client) as rebuild,
):
assert agent._try_refresh_anthropic_client_credentials() is True
old_client.close.assert_called_once()
rebuild.assert_called_once_with("sk-ant-oat01-fresh-token", "https://api.anthropic.com")
assert agent._anthropic_client is new_client
assert agent._anthropic_api_key == "sk-ant-oat01-fresh-token"
def test_try_refresh_anthropic_client_credentials_returns_false_when_token_unchanged(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
):
agent = AIAgent(
api_key="sk-ant-oat01-same-token",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
old_client = MagicMock()
agent._anthropic_client = old_client
agent._anthropic_api_key = "sk-ant-oat01-same-token"
with (
patch("agent.anthropic_adapter.resolve_anthropic_token", return_value="sk-ant-oat01-same-token"),
patch("agent.anthropic_adapter.build_anthropic_client") as rebuild,
):
assert agent._try_refresh_anthropic_client_credentials() is False
old_client.close.assert_not_called()
rebuild.assert_not_called()
def test_anthropic_messages_create_preflights_refresh(self):
with (
patch("run_agent.get_tool_definitions", return_value=_make_tool_defs("web_search")),
patch("run_agent.check_toolset_requirements", return_value={}),
patch("agent.anthropic_adapter.build_anthropic_client", return_value=MagicMock()),
):
agent = AIAgent(
api_key="sk-ant-oat01-current-token",
api_mode="anthropic_messages",
quiet_mode=True,
skip_context_files=True,
skip_memory=True,
)
response = SimpleNamespace(content=[])
agent._anthropic_client = MagicMock()
agent._anthropic_client.messages.create.return_value = response
with patch.object(agent, "_try_refresh_anthropic_client_credentials", return_value=True) as refresh:
result = agent._anthropic_messages_create({"model": "claude-sonnet-4-20250514"})
refresh.assert_called_once_with()
agent._anthropic_client.messages.create.assert_called_once_with(model="claude-sonnet-4-20250514")
assert result is response
# ===================================================================
# _streaming_api_call tests
# ===================================================================
def _make_chunk(content=None, tool_calls=None, finish_reason=None, model="test/model"):
"""Build a SimpleNamespace mimicking an OpenAI streaming chunk."""
delta = SimpleNamespace(content=content, tool_calls=tool_calls)
choice = SimpleNamespace(delta=delta, finish_reason=finish_reason)
return SimpleNamespace(model=model, choices=[choice])
def _make_tc_delta(index=0, tc_id=None, name=None, arguments=None):
"""Build a SimpleNamespace mimicking a streaming tool_call delta."""
func = SimpleNamespace(name=name, arguments=arguments)
return SimpleNamespace(index=index, id=tc_id, function=func)
class TestStreamingApiCall:
"""Tests for _streaming_api_call — voice TTS streaming pipeline."""
def test_content_assembly(self, agent):
chunks = [
_make_chunk(content="Hel"),
_make_chunk(content="lo "),
_make_chunk(content="World"),
_make_chunk(finish_reason="stop"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
callback = MagicMock()
agent.stream_delta_callback = callback
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.choices[0].message.content == "Hello World"
assert resp.choices[0].finish_reason == "stop"
assert callback.call_count == 3
callback.assert_any_call("Hel")
callback.assert_any_call("lo ")
callback.assert_any_call("World")
def test_tool_call_accumulation(self, agent):
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "web_", '{"q":')]),
_make_chunk(tool_calls=[_make_tc_delta(0, None, "search", '"test"}')]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 1
assert tc[0].function.name == "web_search"
assert tc[0].function.arguments == '{"q":"test"}'
assert tc[0].id == "call_1"
def test_multiple_tool_calls(self, agent):
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_a", "search", '{}')]),
_make_chunk(tool_calls=[_make_tc_delta(1, "call_b", "read", '{}')]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 2
assert tc[0].function.name == "search"
assert tc[1].function.name == "read"
def test_ollama_reused_index_separate_tool_calls(self, agent):
"""Ollama sends every tool call at index 0 with different ids.
Without the fix, names and arguments get concatenated into one slot.
"""
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_a", "search", '{"q":"hello"}')]),
# Second tool call at the SAME index 0, but different id
_make_chunk(tool_calls=[_make_tc_delta(0, "call_b", "read_file", '{"path":"x.py"}')]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 2, f"Expected 2 tool calls, got {len(tc)}: {[t.function.name for t in tc]}"
assert tc[0].function.name == "search"
assert tc[0].function.arguments == '{"q":"hello"}'
assert tc[0].id == "call_a"
assert tc[1].function.name == "read_file"
assert tc[1].function.arguments == '{"path":"x.py"}'
assert tc[1].id == "call_b"
def test_ollama_reused_index_streamed_args(self, agent):
"""Ollama with streamed arguments across multiple chunks at same index."""
chunks = [
_make_chunk(tool_calls=[_make_tc_delta(0, "call_a", "search", '{"q":')]),
_make_chunk(tool_calls=[_make_tc_delta(0, None, None, '"hello"}')]),
# New tool call, same index 0
_make_chunk(tool_calls=[_make_tc_delta(0, "call_b", "read", '{}')]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
tc = resp.choices[0].message.tool_calls
assert len(tc) == 2
assert tc[0].function.name == "search"
assert tc[0].function.arguments == '{"q":"hello"}'
assert tc[1].function.name == "read"
assert tc[1].function.arguments == '{}'
def test_content_and_tool_calls_together(self, agent):
chunks = [
_make_chunk(content="I'll search"),
_make_chunk(tool_calls=[_make_tc_delta(0, "call_1", "search", '{}')]),
_make_chunk(finish_reason="tool_calls"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.choices[0].message.content == "I'll search"
assert len(resp.choices[0].message.tool_calls) == 1
def test_empty_content_returns_none(self, agent):
chunks = [_make_chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.choices[0].message.content is None
assert resp.choices[0].message.tool_calls is None
def test_callback_exception_swallowed(self, agent):
chunks = [
_make_chunk(content="Hello"),
_make_chunk(content=" World"),
_make_chunk(finish_reason="stop"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
agent.stream_delta_callback = MagicMock(side_effect=ValueError("boom"))
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.choices[0].message.content == "Hello World"
def test_model_name_captured(self, agent):
chunks = [
_make_chunk(content="Hi", model="gpt-4o"),
_make_chunk(finish_reason="stop", model="gpt-4o"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.model == "gpt-4o"
def test_stream_kwarg_injected(self, agent):
chunks = [_make_chunk(content="x"), _make_chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
agent._interruptible_streaming_api_call({"messages": [], "model": "test"})
call_kwargs = agent.client.chat.completions.create.call_args
assert call_kwargs[1].get("stream") is True or call_kwargs.kwargs.get("stream") is True
def test_api_exception_falls_back_to_non_streaming(self, agent):
"""When streaming fails before any deltas, fallback to non-streaming is attempted."""
agent.client.chat.completions.create.side_effect = ConnectionError("fail")
# The fallback also uses the same client, so it'll fail too
with pytest.raises(ConnectionError, match="fail"):
agent._interruptible_streaming_api_call({"messages": []})
def test_response_has_uuid_id(self, agent):
chunks = [_make_chunk(content="x"), _make_chunk(finish_reason="stop")]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.id.startswith("stream-")
assert len(resp.id) > len("stream-")
def test_empty_choices_chunk_skipped(self, agent):
empty_chunk = SimpleNamespace(model="gpt-4", choices=[])
chunks = [
empty_chunk,
_make_chunk(content="Hello", model="gpt-4"),
_make_chunk(finish_reason="stop", model="gpt-4"),
]
agent.client.chat.completions.create.return_value = iter(chunks)
resp = agent._interruptible_streaming_api_call({"messages": []})
assert resp.choices[0].message.content == "Hello"
assert resp.model == "gpt-4"
# ===================================================================
# Interrupt _vprint force=True verification
# ===================================================================
class TestInterruptVprintForceTrue:
"""All interrupt _vprint calls must use force=True so they are always visible."""
def test_all_interrupt_vprint_have_force_true(self):
"""Scan source for _vprint calls containing 'Interrupt' — each must have force=True."""
import inspect
source = inspect.getsource(AIAgent)
lines = source.split("\n")
violations = []
for i, line in enumerate(lines, 1):
stripped = line.strip()
if "_vprint(" in stripped and "Interrupt" in stripped:
if "force=True" not in stripped:
violations.append(f"line {i}: {stripped}")
assert not violations, (
f"Interrupt _vprint calls missing force=True:\n"
+ "\n".join(violations)
)
# ===================================================================
# Anthropic interrupt handler in _interruptible_api_call
# ===================================================================
class TestAnthropicInterruptHandler:
"""_interruptible_api_call must handle Anthropic mode when interrupted."""
def test_interruptible_has_anthropic_branch(self):
"""The interrupt handler must check api_mode == 'anthropic_messages'."""
import inspect
source = inspect.getsource(AIAgent._interruptible_api_call)
assert "anthropic_messages" in source, \
"_interruptible_api_call must handle Anthropic interrupt (api_mode check)"
def test_interruptible_rebuilds_anthropic_client(self):
"""After interrupting, the Anthropic client should be rebuilt."""
import inspect
source = inspect.getsource(AIAgent._interruptible_api_call)
assert "build_anthropic_client" in source, \
"_interruptible_api_call must rebuild Anthropic client after interrupt"
def test_streaming_has_anthropic_branch(self):
"""_streaming_api_call must also handle Anthropic interrupt."""
import inspect
source = inspect.getsource(AIAgent._interruptible_streaming_api_call)
assert "anthropic_messages" in source, \
"_streaming_api_call must handle Anthropic interrupt"
# ---------------------------------------------------------------------------
# Bugfix: stream_callback forwarding for non-streaming providers
# ---------------------------------------------------------------------------
class TestStreamCallbackNonStreamingProvider:
"""When api_mode != chat_completions, stream_callback must still receive
the response content so TTS works (batch delivery)."""
def test_callback_receives_chat_completions_response(self, agent):
"""For chat_completions-shaped responses, callback gets content."""
agent.api_mode = "anthropic_messages"
mock_response = SimpleNamespace(
choices=[SimpleNamespace(
message=SimpleNamespace(content="Hello", tool_calls=None, reasoning_content=None),
finish_reason="stop", index=0,
)],
usage=None, model="test", id="test-id",
)
agent._interruptible_api_call = MagicMock(return_value=mock_response)
received = []
cb = lambda delta: received.append(delta)
agent._stream_callback = cb
_cb = getattr(agent, "_stream_callback", None)
response = agent._interruptible_api_call({})
if _cb is not None and response:
try:
if agent.api_mode == "anthropic_messages":
text_parts = [
block.text for block in getattr(response, "content", [])
if getattr(block, "type", None) == "text" and getattr(block, "text", None)
]
content = " ".join(text_parts) if text_parts else None
else:
content = response.choices[0].message.content
if content:
_cb(content)
except Exception:
pass
# Anthropic format not matched above; fallback via except
# Test the actual code path by checking chat_completions branch
received2 = []
agent.api_mode = "some_other_mode"
agent._stream_callback = lambda d: received2.append(d)
_cb2 = agent._stream_callback
if _cb2 is not None and mock_response:
try:
content = mock_response.choices[0].message.content
if content:
_cb2(content)
except Exception:
pass
assert received2 == ["Hello"]
def test_callback_receives_anthropic_content(self, agent):
"""For Anthropic responses, text blocks are extracted and forwarded."""
agent.api_mode = "anthropic_messages"
mock_response = SimpleNamespace(
content=[SimpleNamespace(type="text", text="Hello from Claude")],
stop_reason="end_turn",
)
received = []
cb = lambda d: received.append(d)
agent._stream_callback = cb
_cb = agent._stream_callback
if _cb is not None and mock_response:
try:
if agent.api_mode == "anthropic_messages":
text_parts = [
block.text for block in getattr(mock_response, "content", [])
if getattr(block, "type", None) == "text" and getattr(block, "text", None)
]
content = " ".join(text_parts) if text_parts else None
else:
content = mock_response.choices[0].message.content
if content:
_cb(content)
except Exception:
pass
assert received == ["Hello from Claude"]
# ---------------------------------------------------------------------------
# Bugfix: API-only user message prefixes must not persist
# ---------------------------------------------------------------------------
class TestPersistUserMessageOverride:
"""Synthetic API-only user prefixes should never leak into transcripts."""
def test_persist_session_rewrites_current_turn_user_message(self, agent):
agent._session_db = MagicMock()
agent.session_id = "session-123"
agent._last_flushed_db_idx = 0
agent._persist_user_message_idx = 0
agent._persist_user_message_override = "Hello there"
messages = [
{
"role": "user",
"content": (
"[Voice input — respond concisely and conversationally, "
"2-3 sentences max. No code blocks or markdown.] Hello there"
),
},
{"role": "assistant", "content": "Hi!"},
]
with patch.object(agent, "_save_session_log") as mock_save:
agent._persist_session(messages, [])
assert messages[0]["content"] == "Hello there"
saved_messages = mock_save.call_args.args[0]
assert saved_messages[0]["content"] == "Hello there"
first_db_write = agent._session_db.append_message.call_args_list[0].kwargs
assert first_db_write["content"] == "Hello there"
# ---------------------------------------------------------------------------
# Bugfix: _vprint force=True on error messages during TTS
# ---------------------------------------------------------------------------
class TestVprintForceOnErrors:
"""Error/warning messages must be visible during streaming TTS."""
def test_forced_message_shown_during_tts(self, agent):
agent._stream_callback = lambda x: None
printed = []
with patch("builtins.print", side_effect=lambda *a, **kw: printed.append(a)):
agent._vprint("error msg", force=True)
assert len(printed) == 1
def test_non_forced_suppressed_during_tts(self, agent):
agent._stream_callback = lambda x: None
printed = []
with patch("builtins.print", side_effect=lambda *a, **kw: printed.append(a)):
agent._vprint("debug info")
assert len(printed) == 0
def test_all_shown_without_tts(self, agent):
agent._stream_callback = None
printed = []
with patch("builtins.print", side_effect=lambda *a, **kw: printed.append(a)):
agent._vprint("debug")
agent._vprint("error", force=True)
assert len(printed) == 2
class TestNormalizeCodexDictArguments:
"""_normalize_codex_response must produce valid JSON strings for tool
call arguments, even when the Responses API returns them as dicts."""
def _make_codex_response(self, item_type, arguments, item_status="completed"):
"""Build a minimal Responses API response with a single tool call."""
item = SimpleNamespace(
type=item_type,
status=item_status,
)
if item_type == "function_call":
item.name = "web_search"
item.arguments = arguments
item.call_id = "call_abc123"
item.id = "fc_abc123"
elif item_type == "custom_tool_call":
item.name = "web_search"
item.input = arguments
item.call_id = "call_abc123"
item.id = "fc_abc123"
return SimpleNamespace(
output=[item],
status="completed",
)
def test_function_call_dict_arguments_produce_valid_json(self, agent):
"""dict arguments from function_call must be serialised with
json.dumps, not str(), so downstream json.loads() succeeds."""
args_dict = {"query": "weather in NYC", "units": "celsius"}
response = self._make_codex_response("function_call", args_dict)
msg, _ = agent._normalize_codex_response(response)
tc = msg.tool_calls[0]
parsed = json.loads(tc.function.arguments)
assert parsed == args_dict
def test_custom_tool_call_dict_arguments_produce_valid_json(self, agent):
"""dict arguments from custom_tool_call must also use json.dumps."""
args_dict = {"path": "/tmp/test.txt", "content": "hello"}
response = self._make_codex_response("custom_tool_call", args_dict)
msg, _ = agent._normalize_codex_response(response)
tc = msg.tool_calls[0]
parsed = json.loads(tc.function.arguments)
assert parsed == args_dict
def test_string_arguments_unchanged(self, agent):
"""String arguments must pass through without modification."""
args_str = '{"query": "test"}'
response = self._make_codex_response("function_call", args_str)
msg, _ = agent._normalize_codex_response(response)
tc = msg.tool_calls[0]
assert tc.function.arguments == args_str
# ---------------------------------------------------------------------------
# OAuth flag and nudge counter fixes (salvaged from PR #1797)
# ---------------------------------------------------------------------------
class TestOAuthFlagAfterCredentialRefresh:
"""_is_anthropic_oauth must update when token type changes during refresh."""
def test_oauth_flag_updates_api_key_to_oauth(self, agent):
"""Refreshing from API key to OAuth token must set flag to True."""
agent.api_mode = "anthropic_messages"
agent.provider = "anthropic"
agent._anthropic_api_key = "sk-ant-api-old"
agent._anthropic_client = MagicMock()
agent._is_anthropic_oauth = False
with (
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value="sk-ant-setup-oauth-token"),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
):
result = agent._try_refresh_anthropic_client_credentials()
assert result is True
assert agent._is_anthropic_oauth is True
def test_oauth_flag_updates_oauth_to_api_key(self, agent):
"""Refreshing from OAuth to API key must set flag to False."""
agent.api_mode = "anthropic_messages"
agent.provider = "anthropic"
agent._anthropic_api_key = "sk-ant-setup-old"
agent._anthropic_client = MagicMock()
agent._is_anthropic_oauth = True
with (
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value="sk-ant-api03-new-key"),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
):
result = agent._try_refresh_anthropic_client_credentials()
assert result is True
assert agent._is_anthropic_oauth is False
class TestFallbackSetsOAuthFlag:
"""_try_activate_fallback must set _is_anthropic_oauth for Anthropic fallbacks."""
def test_fallback_to_anthropic_oauth_sets_flag(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-6"}
agent._fallback_chain = [agent._fallback_model]
agent._fallback_index = 0
mock_client = MagicMock()
mock_client.base_url = "https://api.anthropic.com/v1"
mock_client.api_key = "sk-ant-setup-oauth-token"
with (
patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_client, None)),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value=None),
):
result = agent._try_activate_fallback()
assert result is True
assert agent._is_anthropic_oauth is True
def test_fallback_to_anthropic_api_key_clears_flag(self, agent):
agent._fallback_activated = False
agent._fallback_model = {"provider": "anthropic", "model": "claude-sonnet-4-6"}
agent._fallback_chain = [agent._fallback_model]
agent._fallback_index = 0
mock_client = MagicMock()
mock_client.base_url = "https://api.anthropic.com/v1"
mock_client.api_key = "sk-ant-api03-regular-key"
with (
patch("agent.auxiliary_client.resolve_provider_client",
return_value=(mock_client, None)),
patch("agent.anthropic_adapter.build_anthropic_client",
return_value=MagicMock()),
patch("agent.anthropic_adapter.resolve_anthropic_token",
return_value=None),
):
result = agent._try_activate_fallback()
assert result is True
assert agent._is_anthropic_oauth is False
class TestMemoryNudgeCounterPersistence:
"""_turns_since_memory must persist across run_conversation calls."""
def test_counters_initialized_in_init(self):
"""Counters must exist on the agent after __init__."""
with patch("run_agent.get_tool_definitions", return_value=[]):
a = AIAgent(
model="test", api_key="test-key", provider="openrouter",
skip_context_files=True, skip_memory=True,
)
assert hasattr(a, "_turns_since_memory")
assert hasattr(a, "_iters_since_skill")
assert a._turns_since_memory == 0
assert a._iters_since_skill == 0
def test_counters_not_reset_in_preamble(self):
"""The run_conversation preamble must not zero the nudge counters."""
import inspect
src = inspect.getsource(AIAgent.run_conversation)
# The preamble resets many fields (retry counts, budget, etc.)
# before the main loop. Find that reset block and verify our
# counters aren't in it. The reset block ends at iteration_budget.
preamble_end = src.index("self.iteration_budget = IterationBudget")
preamble = src[:preamble_end]
assert "self._turns_since_memory = 0" not in preamble
assert "self._iters_since_skill = 0" not in preamble
class TestDeadRetryCode:
"""Unreachable retry_count >= max_retries after raise must not exist."""
def test_no_unreachable_max_retries_after_backoff(self):
import inspect
source = inspect.getsource(AIAgent.run_conversation)
occurrences = source.count("if retry_count >= max_retries:")
assert occurrences == 2, (
f"Expected 2 occurrences of 'if retry_count >= max_retries:' "
f"but found {occurrences}"
)