"""Unit tests for timmy.agentic_loop — agentic loop data structures, parsing, and execution.""" from unittest.mock import AsyncMock, MagicMock, patch import pytest from timmy.agentic_loop import ( AgenticResult, AgenticStep, _broadcast_progress, _parse_steps, run_agentic_loop, ) # ── Data structures ────────────────────────────────────────────────────────── class TestAgenticStep: def test_fields(self): step = AgenticStep( step_num=1, description="Do something", result="Done", status="completed", duration_ms=42, ) assert step.step_num == 1 assert step.description == "Do something" assert step.result == "Done" assert step.status == "completed" assert step.duration_ms == 42 class TestAgenticResult: def test_defaults(self): r = AgenticResult(task_id="abc", task="test task", summary="ok") assert r.steps == [] assert r.status == "completed" assert r.total_duration_ms == 0 def test_with_steps(self): step = AgenticStep(1, "s", "r", "completed", 10) r = AgenticResult(task_id="x", task="t", summary="s", steps=[step]) assert len(r.steps) == 1 # ── _parse_steps ───────────────────────────────────────────────────────────── class TestParseSteps: def test_numbered_dot(self): text = "1. First step\n2. Second step\n3. Third step" assert _parse_steps(text) == ["First step", "Second step", "Third step"] def test_numbered_paren(self): text = "1) Alpha\n2) Beta" assert _parse_steps(text) == ["Alpha", "Beta"] def test_mixed_whitespace(self): text = " 1. Indented step\n 2. Another " result = _parse_steps(text) assert result == ["Indented step", "Another"] def test_fallback_plain_lines(self): text = "Do this\nDo that\nDo the other" assert _parse_steps(text) == ["Do this", "Do that", "Do the other"] def test_empty_string(self): assert _parse_steps("") == [] def test_blank_lines_skipped_in_fallback(self): text = "line one\n\nline two\n \nline three" assert _parse_steps(text) == ["line one", "line two", "line three"] # ── _get_loop_agent ────────────────────────────────────────────────────────── class TestGetLoopAgent: def test_creates_agent_once(self): import timmy.agentic_loop as al saved = al._loop_agent try: al._loop_agent = None mock_agent = MagicMock() with patch("timmy.agent.create_timmy", return_value=mock_agent): result = al._get_loop_agent() assert result is mock_agent # Second call returns cached result2 = al._get_loop_agent() assert result2 is mock_agent finally: al._loop_agent = saved def test_returns_cached(self): import timmy.agentic_loop as al saved = al._loop_agent try: sentinel = object() al._loop_agent = sentinel assert al._get_loop_agent() is sentinel finally: al._loop_agent = saved def test_thread_safe_creation(self): """Concurrent calls must only create one agent (thread-safety).""" import threading import timmy.agentic_loop as al saved = al._loop_agent try: al._loop_agent = None mock_agent = MagicMock() call_count = 0 barrier = threading.Barrier(4) original_create = MagicMock(return_value=mock_agent) def slow_create(): nonlocal call_count call_count += 1 return original_create() results = [None] * 4 def worker(idx): barrier.wait() results[idx] = al._get_loop_agent() with patch("timmy.agent.create_timmy", side_effect=slow_create): threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)] for t in threads: t.start() for t in threads: t.join() # All threads got the same agent assert all(r is mock_agent for r in results) # create_timmy called exactly once assert call_count == 1 finally: al._loop_agent = saved # ── _broadcast_progress ────────────────────────────────────────────────────── class TestBroadcastProgress: @pytest.mark.asyncio async def test_success(self): mock_ws = AsyncMock() with ( patch("timmy.agentic_loop.ws_manager", mock_ws, create=True), patch.dict( "sys.modules", {"infrastructure.ws_manager.handler": MagicMock(ws_manager=mock_ws)}, ), ): await _broadcast_progress("test.event", {"key": "val"}) mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "val"}) @pytest.mark.asyncio async def test_import_error_swallowed(self): with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}): # Should not raise await _broadcast_progress("test.event", {}) # ── run_agentic_loop ───────────────────────────────────────────────────────── def _make_mock_agent(plan_text, step_responses=None): """Create a mock agent whose .run returns predictable content.""" call_count = 0 def run_side_effect(prompt, *, stream=False, session_id=""): nonlocal call_count call_count += 1 resp = MagicMock() if call_count == 1: # Planning call resp.content = plan_text else: idx = call_count - 2 # step index (0-based) if step_responses and idx < len(step_responses): val = step_responses[idx] if isinstance(val, Exception): raise val resp.content = val else: resp.content = f"Step result {call_count}" return resp agent = MagicMock() agent.run = MagicMock(side_effect=run_side_effect) return agent @pytest.fixture def _patch_broadcast(): with patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock): yield @pytest.fixture def _patch_clean_response(): with patch("timmy.session._clean_response", side_effect=lambda x: x): yield class TestRunAgenticLoop: @pytest.mark.asyncio async def test_successful_execution(self, _patch_broadcast, _patch_clean_response): agent = _make_mock_agent("1. Step A\n2. Step B", ["Result A", "Result B"]) mock_settings = MagicMock() mock_settings.max_agent_steps = 10 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch("timmy.agentic_loop.settings", mock_settings, create=True), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=5) assert result.status == "completed" assert len(result.steps) == 2 assert result.steps[0].status == "completed" assert result.steps[0].description == "Step A" assert result.total_duration_ms >= 0 @pytest.mark.asyncio async def test_planning_failure(self, _patch_broadcast): agent = MagicMock() agent.run = MagicMock(side_effect=RuntimeError("LLM down")) mock_settings = MagicMock() mock_settings.max_agent_steps = 5 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=3) assert result.status == "failed" assert "Planning failed" in result.summary @pytest.mark.asyncio async def test_empty_plan(self, _patch_broadcast): agent = _make_mock_agent("") mock_settings = MagicMock() mock_settings.max_agent_steps = 5 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=3) assert result.status == "failed" assert "no steps" in result.summary.lower() @pytest.mark.asyncio async def test_step_failure_triggers_adaptation(self, _patch_broadcast, _patch_clean_response): agent = _make_mock_agent( "1. Do X\n2. Do Y", [RuntimeError("oops"), "Adapted result", "Y done"], ) mock_settings = MagicMock() mock_settings.max_agent_steps = 10 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=5) # Step 1 should be adapted, step 2 completed statuses = [s.status for s in result.steps] assert "adapted" in statuses @pytest.mark.asyncio async def test_truncation_marks_partial(self, _patch_broadcast, _patch_clean_response): agent = _make_mock_agent( "1. A\n2. B\n3. C\n4. D\n5. E", ["r1", "r2"], ) mock_settings = MagicMock() mock_settings.max_agent_steps = 10 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=2) assert result.status == "partial" @pytest.mark.asyncio async def test_on_progress_callback(self, _patch_broadcast, _patch_clean_response): agent = _make_mock_agent("1. Only step", ["done"]) mock_settings = MagicMock() mock_settings.max_agent_steps = 10 callback = AsyncMock() with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=5, on_progress=callback) callback.assert_awaited_once_with("Only step", 1, 1) assert result.status == "completed" @pytest.mark.asyncio async def test_default_max_steps_from_settings(self, _patch_broadcast, _patch_clean_response): agent = _make_mock_agent("1. S1", ["r1"]) mock_settings = MagicMock() mock_settings.max_agent_steps = 3 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff") # max_steps=0 → from settings assert result.status == "completed" @pytest.mark.asyncio async def test_failed_step_and_failed_adaptation(self, _patch_broadcast, _patch_clean_response): """When both step and adaptation fail, step is marked failed.""" call_count = 0 def run_side_effect(prompt, *, stream=False, session_id=""): nonlocal call_count call_count += 1 if call_count == 1: resp = MagicMock() resp.content = "1. Only step" return resp # Both step execution and adaptation fail raise RuntimeError("everything broken") agent = MagicMock() agent.run = MagicMock(side_effect=run_side_effect) mock_settings = MagicMock() mock_settings.max_agent_steps = 10 with ( patch("timmy.agentic_loop._get_loop_agent", return_value=agent), patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}), ): result = await run_agentic_loop("do stuff", max_steps=5) assert result.steps[0].status == "failed" assert "Failed" in result.steps[0].result assert result.status == "partial"