This commit was merged in pull request #441.
This commit is contained in:
386
tests/timmy/test_agentic_loop.py
Normal file
386
tests/timmy/test_agentic_loop.py
Normal file
@@ -0,0 +1,386 @@
|
||||
"""Tests for timmy.agentic_loop — multi-step task execution engine."""
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.agentic_loop import (
|
||||
AgenticResult,
|
||||
AgenticStep,
|
||||
_parse_steps,
|
||||
)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data structures
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestAgenticStep:
|
||||
"""Unit tests for the AgenticStep dataclass."""
|
||||
|
||||
def test_creation(self):
|
||||
step = AgenticStep(
|
||||
step_num=1,
|
||||
description="Do thing",
|
||||
result="Done",
|
||||
status="completed",
|
||||
duration_ms=42,
|
||||
)
|
||||
assert step.step_num == 1
|
||||
assert step.description == "Do thing"
|
||||
assert step.result == "Done"
|
||||
assert step.status == "completed"
|
||||
assert step.duration_ms == 42
|
||||
|
||||
def test_failed_status(self):
|
||||
step = AgenticStep(
|
||||
step_num=2, description="Bad step", result="Error", status="failed", duration_ms=10
|
||||
)
|
||||
assert step.status == "failed"
|
||||
|
||||
def test_adapted_status(self):
|
||||
step = AgenticStep(
|
||||
step_num=3, description="Retried", result="OK", status="adapted", duration_ms=100
|
||||
)
|
||||
assert step.status == "adapted"
|
||||
|
||||
|
||||
class TestAgenticResult:
|
||||
"""Unit tests for the AgenticResult dataclass."""
|
||||
|
||||
def test_defaults(self):
|
||||
result = AgenticResult(task_id="abc", task="Test", summary="Done")
|
||||
assert result.steps == []
|
||||
assert result.status == "completed"
|
||||
assert result.total_duration_ms == 0
|
||||
|
||||
def test_with_steps(self):
|
||||
s = AgenticStep(step_num=1, description="A", result="B", status="completed", duration_ms=5)
|
||||
result = AgenticResult(task_id="x", task="T", summary="S", steps=[s])
|
||||
assert len(result.steps) == 1
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _parse_steps — pure function, highly testable
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestParseSteps:
|
||||
"""Unit tests for the plan parser."""
|
||||
|
||||
def test_numbered_with_dots(self):
|
||||
text = "1. First step\n2. Second step\n3. Third step"
|
||||
steps = _parse_steps(text)
|
||||
assert steps == ["First step", "Second step", "Third step"]
|
||||
|
||||
def test_numbered_with_parens(self):
|
||||
text = "1) Do this\n2) Do that"
|
||||
steps = _parse_steps(text)
|
||||
assert steps == ["Do this", "Do that"]
|
||||
|
||||
def test_mixed_numbering(self):
|
||||
text = "1. Step one\n2) Step two\n3. Step three"
|
||||
steps = _parse_steps(text)
|
||||
assert len(steps) == 3
|
||||
|
||||
def test_indented_steps(self):
|
||||
text = " 1. Indented step\n 2. Also indented"
|
||||
steps = _parse_steps(text)
|
||||
assert len(steps) == 2
|
||||
assert steps[0] == "Indented step"
|
||||
|
||||
def test_no_numbered_steps_fallback(self):
|
||||
text = "Do this first\nThen do that\nFinally wrap up"
|
||||
steps = _parse_steps(text)
|
||||
assert len(steps) == 3
|
||||
assert steps[0] == "Do this first"
|
||||
|
||||
def test_empty_string(self):
|
||||
steps = _parse_steps("")
|
||||
assert steps == []
|
||||
|
||||
def test_blank_lines_ignored_in_fallback(self):
|
||||
text = "Step A\n\n\nStep B\n"
|
||||
steps = _parse_steps(text)
|
||||
assert steps == ["Step A", "Step B"]
|
||||
|
||||
def test_strips_whitespace(self):
|
||||
text = "1. Lots of space \n2. Also spaced "
|
||||
steps = _parse_steps(text)
|
||||
assert steps[0] == "Lots of space"
|
||||
assert steps[1] == "Also spaced"
|
||||
|
||||
def test_preamble_ignored_when_numbered(self):
|
||||
text = "Here is the plan:\n1. Step one\n2. Step two"
|
||||
steps = _parse_steps(text)
|
||||
assert steps == ["Step one", "Step two"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _get_loop_agent — singleton pattern
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestGetLoopAgent:
|
||||
"""Tests for the agent singleton."""
|
||||
|
||||
def test_creates_agent_once(self):
|
||||
import timmy.agentic_loop as mod
|
||||
|
||||
mod._loop_agent = None
|
||||
mock_agent = MagicMock()
|
||||
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_create:
|
||||
agent = mod._get_loop_agent()
|
||||
assert agent is mock_agent
|
||||
mock_create.assert_called_once()
|
||||
|
||||
# Second call should reuse singleton
|
||||
agent2 = mod._get_loop_agent()
|
||||
assert agent2 is mock_agent
|
||||
mock_create.assert_called_once()
|
||||
|
||||
mod._loop_agent = None # cleanup
|
||||
|
||||
def test_reuses_existing(self):
|
||||
import timmy.agentic_loop as mod
|
||||
|
||||
sentinel = MagicMock()
|
||||
mod._loop_agent = sentinel
|
||||
assert mod._get_loop_agent() is sentinel
|
||||
mod._loop_agent = None # cleanup
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _broadcast_progress — best-effort WebSocket broadcast
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestBroadcastProgress:
|
||||
"""Tests for the WebSocket broadcast helper."""
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_broadcast(self):
|
||||
from timmy.agentic_loop import _broadcast_progress
|
||||
|
||||
mock_ws = MagicMock()
|
||||
mock_ws.broadcast = AsyncMock()
|
||||
mock_module = MagicMock()
|
||||
mock_module.ws_manager = mock_ws
|
||||
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": mock_module}):
|
||||
await _broadcast_progress("test.event", {"key": "value"})
|
||||
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "value"})
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_import_error_swallowed(self):
|
||||
"""When ws_manager import fails, broadcast silently succeeds."""
|
||||
import sys
|
||||
|
||||
from timmy.agentic_loop import _broadcast_progress
|
||||
|
||||
# Remove the module so import fails
|
||||
saved = sys.modules.pop("infrastructure.ws_manager.handler", None)
|
||||
try:
|
||||
with patch.dict("sys.modules", {"infrastructure": None}):
|
||||
# Should not raise — errors are swallowed
|
||||
await _broadcast_progress("fail.event", {})
|
||||
finally:
|
||||
if saved is not None:
|
||||
sys.modules["infrastructure.ws_manager.handler"] = saved
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_agentic_loop — integration-style tests with mocked agent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class TestRunAgenticLoop:
|
||||
"""Tests for the main agentic loop."""
|
||||
|
||||
@pytest.fixture(autouse=True)
|
||||
def _reset_agent(self):
|
||||
import timmy.agentic_loop as mod
|
||||
|
||||
mod._loop_agent = None
|
||||
yield
|
||||
mod._loop_agent = None
|
||||
|
||||
def _mock_agent(self, responses):
|
||||
"""Create a mock agent that returns responses in sequence."""
|
||||
agent = MagicMock()
|
||||
run_results = []
|
||||
for r in responses:
|
||||
mock_result = MagicMock()
|
||||
mock_result.content = r
|
||||
run_results.append(mock_result)
|
||||
agent.run = MagicMock(side_effect=run_results)
|
||||
return agent
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_successful_two_step_task(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = self._mock_agent(
|
||||
[
|
||||
"1. Step one\n2. Step two", # planning
|
||||
"Step one done", # execution step 1
|
||||
"Step two done", # execution step 2
|
||||
]
|
||||
)
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
patch("timmy.session._clean_response", side_effect=lambda x: x),
|
||||
):
|
||||
result = await run_agentic_loop("Test task", max_steps=5)
|
||||
|
||||
assert result.status == "completed"
|
||||
assert len(result.steps) == 2
|
||||
assert result.steps[0].status == "completed"
|
||||
assert result.steps[1].status == "completed"
|
||||
assert result.total_duration_ms >= 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_planning_failure(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = MagicMock()
|
||||
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
):
|
||||
result = await run_agentic_loop("Broken task", max_steps=3)
|
||||
|
||||
assert result.status == "failed"
|
||||
assert "Planning failed" in result.summary
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_empty_plan(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = self._mock_agent([""]) # empty plan
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
):
|
||||
result = await run_agentic_loop("Empty plan task", max_steps=3)
|
||||
|
||||
assert result.status == "failed"
|
||||
assert "no steps" in result.summary.lower()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_step_failure_triggers_adaptation(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = MagicMock()
|
||||
call_count = 0
|
||||
|
||||
def mock_run(prompt, **kwargs):
|
||||
nonlocal call_count
|
||||
call_count += 1
|
||||
result = MagicMock()
|
||||
if call_count == 1:
|
||||
result.content = "1. Only step"
|
||||
elif call_count == 2:
|
||||
raise RuntimeError("Step failed")
|
||||
else:
|
||||
result.content = "Adapted successfully"
|
||||
return result
|
||||
|
||||
agent.run = mock_run
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
patch("timmy.session._clean_response", side_effect=lambda x: x),
|
||||
):
|
||||
result = await run_agentic_loop("Failing task", max_steps=5)
|
||||
|
||||
assert len(result.steps) == 1
|
||||
assert result.steps[0].status == "adapted"
|
||||
assert "[Adapted]" in result.steps[0].description
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_max_steps_truncation(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = self._mock_agent(
|
||||
[
|
||||
"1. A\n2. B\n3. C\n4. D\n5. E", # 5 steps planned
|
||||
"Done A",
|
||||
"Done B",
|
||||
]
|
||||
)
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
patch("timmy.session._clean_response", side_effect=lambda x: x),
|
||||
):
|
||||
result = await run_agentic_loop("Big task", max_steps=2)
|
||||
|
||||
assert result.status == "partial" # was truncated
|
||||
assert len(result.steps) == 2
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_on_progress_callback(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = self._mock_agent(
|
||||
[
|
||||
"1. Only step",
|
||||
"Step done",
|
||||
]
|
||||
)
|
||||
progress_calls = []
|
||||
|
||||
async def track_progress(desc, step_num, total):
|
||||
progress_calls.append((desc, step_num, total))
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
patch("timmy.session._clean_response", side_effect=lambda x: x),
|
||||
):
|
||||
await run_agentic_loop("Callback task", max_steps=5, on_progress=track_progress)
|
||||
|
||||
assert len(progress_calls) == 1
|
||||
assert progress_calls[0][1] == 1 # step_num
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_default_max_steps_from_settings(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = self._mock_agent(["1. Step one", "Done"])
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.max_agent_steps = 7
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
patch("timmy.session._clean_response", side_effect=lambda x: x),
|
||||
patch("config.settings", mock_settings),
|
||||
):
|
||||
result = await run_agentic_loop("Settings task")
|
||||
|
||||
assert result.status == "completed"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_task_id_generated(self):
|
||||
from timmy.agentic_loop import run_agentic_loop
|
||||
|
||||
agent = self._mock_agent(["1. Step", "OK"])
|
||||
|
||||
with (
|
||||
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
|
||||
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
|
||||
patch("timmy.session._clean_response", side_effect=lambda x: x),
|
||||
):
|
||||
result = await run_agentic_loop("ID task", max_steps=5)
|
||||
|
||||
assert result.task_id # non-empty
|
||||
assert len(result.task_id) == 8 # uuid[:8]
|
||||
Reference in New Issue
Block a user