diff --git a/tests/timmy/test_agentic_loop.py b/tests/timmy/test_agentic_loop.py new file mode 100644 index 00000000..241d680a --- /dev/null +++ b/tests/timmy/test_agentic_loop.py @@ -0,0 +1,386 @@ +"""Tests for timmy.agentic_loop — multi-step task execution engine.""" + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from timmy.agentic_loop import ( + AgenticResult, + AgenticStep, + _parse_steps, +) + +# --------------------------------------------------------------------------- +# Data structures +# --------------------------------------------------------------------------- + + +class TestAgenticStep: + """Unit tests for the AgenticStep dataclass.""" + + def test_creation(self): + step = AgenticStep( + step_num=1, + description="Do thing", + result="Done", + status="completed", + duration_ms=42, + ) + assert step.step_num == 1 + assert step.description == "Do thing" + assert step.result == "Done" + assert step.status == "completed" + assert step.duration_ms == 42 + + def test_failed_status(self): + step = AgenticStep( + step_num=2, description="Bad step", result="Error", status="failed", duration_ms=10 + ) + assert step.status == "failed" + + def test_adapted_status(self): + step = AgenticStep( + step_num=3, description="Retried", result="OK", status="adapted", duration_ms=100 + ) + assert step.status == "adapted" + + +class TestAgenticResult: + """Unit tests for the AgenticResult dataclass.""" + + def test_defaults(self): + result = AgenticResult(task_id="abc", task="Test", summary="Done") + assert result.steps == [] + assert result.status == "completed" + assert result.total_duration_ms == 0 + + def test_with_steps(self): + s = AgenticStep(step_num=1, description="A", result="B", status="completed", duration_ms=5) + result = AgenticResult(task_id="x", task="T", summary="S", steps=[s]) + assert len(result.steps) == 1 + + +# --------------------------------------------------------------------------- +# _parse_steps — pure function, highly testable +# --------------------------------------------------------------------------- + + +class TestParseSteps: + """Unit tests for the plan parser.""" + + def test_numbered_with_dots(self): + text = "1. First step\n2. Second step\n3. Third step" + steps = _parse_steps(text) + assert steps == ["First step", "Second step", "Third step"] + + def test_numbered_with_parens(self): + text = "1) Do this\n2) Do that" + steps = _parse_steps(text) + assert steps == ["Do this", "Do that"] + + def test_mixed_numbering(self): + text = "1. Step one\n2) Step two\n3. Step three" + steps = _parse_steps(text) + assert len(steps) == 3 + + def test_indented_steps(self): + text = " 1. Indented step\n 2. Also indented" + steps = _parse_steps(text) + assert len(steps) == 2 + assert steps[0] == "Indented step" + + def test_no_numbered_steps_fallback(self): + text = "Do this first\nThen do that\nFinally wrap up" + steps = _parse_steps(text) + assert len(steps) == 3 + assert steps[0] == "Do this first" + + def test_empty_string(self): + steps = _parse_steps("") + assert steps == [] + + def test_blank_lines_ignored_in_fallback(self): + text = "Step A\n\n\nStep B\n" + steps = _parse_steps(text) + assert steps == ["Step A", "Step B"] + + def test_strips_whitespace(self): + text = "1. Lots of space \n2. Also spaced " + steps = _parse_steps(text) + assert steps[0] == "Lots of space" + assert steps[1] == "Also spaced" + + def test_preamble_ignored_when_numbered(self): + text = "Here is the plan:\n1. Step one\n2. Step two" + steps = _parse_steps(text) + assert steps == ["Step one", "Step two"] + + +# --------------------------------------------------------------------------- +# _get_loop_agent — singleton pattern +# --------------------------------------------------------------------------- + + +class TestGetLoopAgent: + """Tests for the agent singleton.""" + + def test_creates_agent_once(self): + import timmy.agentic_loop as mod + + mod._loop_agent = None + mock_agent = MagicMock() + with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_create: + agent = mod._get_loop_agent() + assert agent is mock_agent + mock_create.assert_called_once() + + # Second call should reuse singleton + agent2 = mod._get_loop_agent() + assert agent2 is mock_agent + mock_create.assert_called_once() + + mod._loop_agent = None # cleanup + + def test_reuses_existing(self): + import timmy.agentic_loop as mod + + sentinel = MagicMock() + mod._loop_agent = sentinel + assert mod._get_loop_agent() is sentinel + mod._loop_agent = None # cleanup + + +# --------------------------------------------------------------------------- +# _broadcast_progress — best-effort WebSocket broadcast +# --------------------------------------------------------------------------- + + +class TestBroadcastProgress: + """Tests for the WebSocket broadcast helper.""" + + @pytest.mark.asyncio + async def test_successful_broadcast(self): + from timmy.agentic_loop import _broadcast_progress + + mock_ws = MagicMock() + mock_ws.broadcast = AsyncMock() + mock_module = MagicMock() + mock_module.ws_manager = mock_ws + with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": mock_module}): + await _broadcast_progress("test.event", {"key": "value"}) + mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "value"}) + + @pytest.mark.asyncio + async def test_import_error_swallowed(self): + """When ws_manager import fails, broadcast silently succeeds.""" + import sys + + from timmy.agentic_loop import _broadcast_progress + + # Remove the module so import fails + saved = sys.modules.pop("infrastructure.ws_manager.handler", None) + try: + with patch.dict("sys.modules", {"infrastructure": None}): + # Should not raise — errors are swallowed + await _broadcast_progress("fail.event", {}) + finally: + if saved is not None: + sys.modules["infrastructure.ws_manager.handler"] = saved + + +# --------------------------------------------------------------------------- +# run_agentic_loop — integration-style tests with mocked agent +# --------------------------------------------------------------------------- + + +class TestRunAgenticLoop: + """Tests for the main agentic loop.""" + + @pytest.fixture(autouse=True) + def _reset_agent(self): + import timmy.agentic_loop as mod + + mod._loop_agent = None + yield + mod._loop_agent = None + + def _mock_agent(self, responses): + """Create a mock agent that returns responses in sequence.""" + agent = MagicMock() + run_results = [] + for r in responses: + mock_result = MagicMock() + mock_result.content = r + run_results.append(mock_result) + agent.run = MagicMock(side_effect=run_results) + return agent + + @pytest.mark.asyncio + async def test_successful_two_step_task(self): + from timmy.agentic_loop import run_agentic_loop + + agent = self._mock_agent( + [ + "1. Step one\n2. Step two", # planning + "Step one done", # execution step 1 + "Step two done", # execution step 2 + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + patch("timmy.session._clean_response", side_effect=lambda x: x), + ): + result = await run_agentic_loop("Test task", max_steps=5) + + assert result.status == "completed" + assert len(result.steps) == 2 + assert result.steps[0].status == "completed" + assert result.steps[1].status == "completed" + assert result.total_duration_ms >= 0 + + @pytest.mark.asyncio + async def test_planning_failure(self): + from timmy.agentic_loop import run_agentic_loop + + agent = MagicMock() + agent.run = MagicMock(side_effect=RuntimeError("LLM down")) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + ): + result = await run_agentic_loop("Broken task", max_steps=3) + + assert result.status == "failed" + assert "Planning failed" in result.summary + + @pytest.mark.asyncio + async def test_empty_plan(self): + from timmy.agentic_loop import run_agentic_loop + + agent = self._mock_agent([""]) # empty plan + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + ): + result = await run_agentic_loop("Empty plan task", max_steps=3) + + assert result.status == "failed" + assert "no steps" in result.summary.lower() + + @pytest.mark.asyncio + async def test_step_failure_triggers_adaptation(self): + from timmy.agentic_loop import run_agentic_loop + + agent = MagicMock() + call_count = 0 + + def mock_run(prompt, **kwargs): + nonlocal call_count + call_count += 1 + result = MagicMock() + if call_count == 1: + result.content = "1. Only step" + elif call_count == 2: + raise RuntimeError("Step failed") + else: + result.content = "Adapted successfully" + return result + + agent.run = mock_run + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + patch("timmy.session._clean_response", side_effect=lambda x: x), + ): + result = await run_agentic_loop("Failing task", max_steps=5) + + assert len(result.steps) == 1 + assert result.steps[0].status == "adapted" + assert "[Adapted]" in result.steps[0].description + + @pytest.mark.asyncio + async def test_max_steps_truncation(self): + from timmy.agentic_loop import run_agentic_loop + + agent = self._mock_agent( + [ + "1. A\n2. B\n3. C\n4. D\n5. E", # 5 steps planned + "Done A", + "Done B", + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + patch("timmy.session._clean_response", side_effect=lambda x: x), + ): + result = await run_agentic_loop("Big task", max_steps=2) + + assert result.status == "partial" # was truncated + assert len(result.steps) == 2 + + @pytest.mark.asyncio + async def test_on_progress_callback(self): + from timmy.agentic_loop import run_agentic_loop + + agent = self._mock_agent( + [ + "1. Only step", + "Step done", + ] + ) + progress_calls = [] + + async def track_progress(desc, step_num, total): + progress_calls.append((desc, step_num, total)) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + patch("timmy.session._clean_response", side_effect=lambda x: x), + ): + await run_agentic_loop("Callback task", max_steps=5, on_progress=track_progress) + + assert len(progress_calls) == 1 + assert progress_calls[0][1] == 1 # step_num + + @pytest.mark.asyncio + async def test_default_max_steps_from_settings(self): + from timmy.agentic_loop import run_agentic_loop + + agent = self._mock_agent(["1. Step one", "Done"]) + + mock_settings = MagicMock() + mock_settings.max_agent_steps = 7 + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + patch("timmy.session._clean_response", side_effect=lambda x: x), + patch("config.settings", mock_settings), + ): + result = await run_agentic_loop("Settings task") + + assert result.status == "completed" + + @pytest.mark.asyncio + async def test_task_id_generated(self): + from timmy.agentic_loop import run_agentic_loop + + agent = self._mock_agent(["1. Step", "OK"]) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=agent), + patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock), + patch("timmy.session._clean_response", side_effect=lambda x: x), + ): + result = await run_agentic_loop("ID task", max_steps=5) + + assert result.task_id # non-empty + assert len(result.task_id) == 8 # uuid[:8]