diff --git a/tests/integrations/test_agentic_ws_broadcast.py b/tests/integrations/test_agentic_ws_broadcast.py new file mode 100644 index 0000000..e9b236b --- /dev/null +++ b/tests/integrations/test_agentic_ws_broadcast.py @@ -0,0 +1,285 @@ +"""Integration tests for agentic loop WebSocket broadcasts. + +Verifies that ``run_agentic_loop`` pushes the correct sequence of events +through the real ``ws_manager`` and that connected (mock) WebSocket clients +receive every broadcast with the expected payloads. +""" + +import json +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from infrastructure.ws_manager.handler import WebSocketManager +from timmy.agentic_loop import run_agentic_loop + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + + +def _mock_run(content: str): + m = MagicMock() + m.content = content + return m + + +def _ws_client() -> AsyncMock: + """Return a fake WebSocket that records sent messages.""" + return AsyncMock() + + +def _collected_events(ws: AsyncMock) -> list[dict]: + """Extract parsed JSON events from a mock WebSocket's send_text calls.""" + return [json.loads(call.args[0]) for call in ws.send_text.call_args_list] + + +# --------------------------------------------------------------------------- +# Tests +# --------------------------------------------------------------------------- + + +class TestAgenticLoopBroadcastSequence: + """Events arrive at WS clients in the correct order with expected data.""" + + @pytest.mark.asyncio + async def test_successful_run_broadcasts_plan_steps_complete(self): + """A successful 2-step loop emits plan_ready → 2× step_complete → task_complete.""" + mgr = WebSocketManager() + ws = _ws_client() + mgr._connections = [ws] + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Gather data\n2. Summarise"), + _mock_run("Gathered 10 records"), + _mock_run("Summary written"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + result = await run_agentic_loop("Gather and summarise", max_steps=2) + + assert result.status == "completed" + + events = _collected_events(ws) + event_names = [e["event"] for e in events] + assert event_names == [ + "agentic.plan_ready", + "agentic.step_complete", + "agentic.step_complete", + "agentic.task_complete", + ] + + @pytest.mark.asyncio + async def test_plan_ready_payload(self): + """plan_ready contains task_id, task, steps list, and total count.""" + mgr = WebSocketManager() + ws = _ws_client() + mgr._connections = [ws] + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Alpha\n2. Beta"), + _mock_run("Alpha done"), + _mock_run("Beta done"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + result = await run_agentic_loop("Two steps") + + plan_event = _collected_events(ws)[0] + assert plan_event["event"] == "agentic.plan_ready" + data = plan_event["data"] + assert data["task_id"] == result.task_id + assert data["task"] == "Two steps" + assert data["steps"] == ["Alpha", "Beta"] + assert data["total"] == 2 + + @pytest.mark.asyncio + async def test_step_complete_payload(self): + """step_complete carries step number, total, description, and result.""" + mgr = WebSocketManager() + ws = _ws_client() + mgr._connections = [ws] + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Only step"), + _mock_run("Step result text"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + await run_agentic_loop("Single step", max_steps=1) + + step_event = _collected_events(ws)[1] + assert step_event["event"] == "agentic.step_complete" + data = step_event["data"] + assert data["step"] == 1 + assert data["total"] == 1 + assert data["description"] == "Only step" + assert "Step result text" in data["result"] + + @pytest.mark.asyncio + async def test_task_complete_payload(self): + """task_complete has status, steps_completed, summary, and duration_ms.""" + mgr = WebSocketManager() + ws = _ws_client() + mgr._connections = [ws] + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Do it"), + _mock_run("Done"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + await run_agentic_loop("Quick", max_steps=1) + + complete_event = _collected_events(ws)[-1] + assert complete_event["event"] == "agentic.task_complete" + data = complete_event["data"] + assert data["status"] == "completed" + assert data["steps_completed"] == 1 + assert isinstance(data["duration_ms"], int) + assert data["duration_ms"] >= 0 + assert data["summary"] + + +class TestAdaptationBroadcast: + """Adapted steps emit step_adapted events.""" + + @pytest.mark.asyncio + async def test_adapted_step_broadcasts_step_adapted(self): + """A failed-then-adapted step emits agentic.step_adapted.""" + mgr = WebSocketManager() + ws = _ws_client() + mgr._connections = [ws] + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Risky step"), + Exception("disk full"), + _mock_run("Used /tmp instead"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + result = await run_agentic_loop("Adapt test", max_steps=1) + + events = _collected_events(ws) + event_names = [e["event"] for e in events] + assert "agentic.step_adapted" in event_names + + adapted = next(e for e in events if e["event"] == "agentic.step_adapted") + assert adapted["data"]["error"] == "disk full" + assert adapted["data"]["adaptation"] + assert result.steps[0].status == "adapted" + + +class TestMultipleClients: + """All connected clients receive every broadcast.""" + + @pytest.mark.asyncio + async def test_two_clients_receive_all_events(self): + mgr = WebSocketManager() + ws1 = _ws_client() + ws2 = _ws_client() + mgr._connections = [ws1, ws2] + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Step A"), + _mock_run("A done"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + await run_agentic_loop("Multi-client", max_steps=1) + + events1 = _collected_events(ws1) + events2 = _collected_events(ws2) + assert len(events1) == len(events2) == 3 # plan + step + complete + assert [e["event"] for e in events1] == [e["event"] for e in events2] + + +class TestEventHistory: + """Broadcasts are recorded in ws_manager event history.""" + + @pytest.mark.asyncio + async def test_events_appear_in_history(self): + mgr = WebSocketManager() + + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Only"), + _mock_run("Done"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch("infrastructure.ws_manager.handler.ws_manager", mgr), + ): + await run_agentic_loop("History test", max_steps=1) + + history_events = [e.event for e in mgr.event_history] + assert "agentic.plan_ready" in history_events + assert "agentic.step_complete" in history_events + assert "agentic.task_complete" in history_events + + +class TestBroadcastGracefulDegradation: + """Loop completes even when ws_manager is unavailable.""" + + @pytest.mark.asyncio + async def test_loop_succeeds_when_broadcast_fails(self): + """ImportError from ws_manager doesn't crash the loop.""" + mock_agent = MagicMock() + mock_agent.run = MagicMock( + side_effect=[ + _mock_run("1. Do it"), + _mock_run("Done"), + ] + ) + + with ( + patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent), + patch( + "infrastructure.ws_manager.handler.ws_manager", + new_callable=lambda: MagicMock, + ) as broken_mgr, + ): + broken_mgr.broadcast = AsyncMock(side_effect=RuntimeError("ws down")) + result = await run_agentic_loop("Resilient task", max_steps=1) + + assert result.status == "completed" + assert len(result.steps) == 1