Compare commits
1 Commits
kimi/issue
...
kimi/issue
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
57f4f37a9b |
@@ -68,14 +68,18 @@ def _get_loop_agent():
|
|||||||
|
|
||||||
Returns the same type of agent as `create_timmy()` but with a
|
Returns the same type of agent as `create_timmy()` but with a
|
||||||
dedicated session so it doesn't pollute the main chat history.
|
dedicated session so it doesn't pollute the main chat history.
|
||||||
|
|
||||||
|
Thread-safe: uses a lock to prevent duplicate agent creation
|
||||||
|
when multiple loops start concurrently.
|
||||||
"""
|
"""
|
||||||
global _loop_agent
|
global _loop_agent
|
||||||
if _loop_agent is None:
|
if _loop_agent is not None:
|
||||||
with _loop_agent_lock:
|
return _loop_agent
|
||||||
if _loop_agent is None:
|
with _loop_agent_lock:
|
||||||
from timmy.agent import create_timmy
|
if _loop_agent is None:
|
||||||
|
from timmy.agent import create_timmy
|
||||||
|
|
||||||
_loop_agent = create_timmy()
|
_loop_agent = create_timmy()
|
||||||
return _loop_agent
|
return _loop_agent
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@@ -1,285 +0,0 @@
|
|||||||
"""Integration tests for agentic loop WebSocket broadcasts.
|
|
||||||
|
|
||||||
Verifies that ``run_agentic_loop`` pushes the correct sequence of events
|
|
||||||
through the real ``ws_manager`` and that connected (mock) WebSocket clients
|
|
||||||
receive every broadcast with the expected payloads.
|
|
||||||
"""
|
|
||||||
|
|
||||||
import json
|
|
||||||
from unittest.mock import AsyncMock, MagicMock, patch
|
|
||||||
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from infrastructure.ws_manager.handler import WebSocketManager
|
|
||||||
from timmy.agentic_loop import run_agentic_loop
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Helpers
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
def _mock_run(content: str):
|
|
||||||
m = MagicMock()
|
|
||||||
m.content = content
|
|
||||||
return m
|
|
||||||
|
|
||||||
|
|
||||||
def _ws_client() -> AsyncMock:
|
|
||||||
"""Return a fake WebSocket that records sent messages."""
|
|
||||||
return AsyncMock()
|
|
||||||
|
|
||||||
|
|
||||||
def _collected_events(ws: AsyncMock) -> list[dict]:
|
|
||||||
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
|
|
||||||
return [json.loads(call.args[0]) for call in ws.send_text.call_args_list]
|
|
||||||
|
|
||||||
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
# Tests
|
|
||||||
# ---------------------------------------------------------------------------
|
|
||||||
|
|
||||||
|
|
||||||
class TestAgenticLoopBroadcastSequence:
|
|
||||||
"""Events arrive at WS clients in the correct order with expected data."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_successful_run_broadcasts_plan_steps_complete(self):
|
|
||||||
"""A successful 2-step loop emits plan_ready → 2× step_complete → task_complete."""
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
ws = _ws_client()
|
|
||||||
mgr._connections = [ws]
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Gather data\n2. Summarise"),
|
|
||||||
_mock_run("Gathered 10 records"),
|
|
||||||
_mock_run("Summary written"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
result = await run_agentic_loop("Gather and summarise", max_steps=2)
|
|
||||||
|
|
||||||
assert result.status == "completed"
|
|
||||||
|
|
||||||
events = _collected_events(ws)
|
|
||||||
event_names = [e["event"] for e in events]
|
|
||||||
assert event_names == [
|
|
||||||
"agentic.plan_ready",
|
|
||||||
"agentic.step_complete",
|
|
||||||
"agentic.step_complete",
|
|
||||||
"agentic.task_complete",
|
|
||||||
]
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_plan_ready_payload(self):
|
|
||||||
"""plan_ready contains task_id, task, steps list, and total count."""
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
ws = _ws_client()
|
|
||||||
mgr._connections = [ws]
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Alpha\n2. Beta"),
|
|
||||||
_mock_run("Alpha done"),
|
|
||||||
_mock_run("Beta done"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
result = await run_agentic_loop("Two steps")
|
|
||||||
|
|
||||||
plan_event = _collected_events(ws)[0]
|
|
||||||
assert plan_event["event"] == "agentic.plan_ready"
|
|
||||||
data = plan_event["data"]
|
|
||||||
assert data["task_id"] == result.task_id
|
|
||||||
assert data["task"] == "Two steps"
|
|
||||||
assert data["steps"] == ["Alpha", "Beta"]
|
|
||||||
assert data["total"] == 2
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_step_complete_payload(self):
|
|
||||||
"""step_complete carries step number, total, description, and result."""
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
ws = _ws_client()
|
|
||||||
mgr._connections = [ws]
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Only step"),
|
|
||||||
_mock_run("Step result text"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
await run_agentic_loop("Single step", max_steps=1)
|
|
||||||
|
|
||||||
step_event = _collected_events(ws)[1]
|
|
||||||
assert step_event["event"] == "agentic.step_complete"
|
|
||||||
data = step_event["data"]
|
|
||||||
assert data["step"] == 1
|
|
||||||
assert data["total"] == 1
|
|
||||||
assert data["description"] == "Only step"
|
|
||||||
assert "Step result text" in data["result"]
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_task_complete_payload(self):
|
|
||||||
"""task_complete has status, steps_completed, summary, and duration_ms."""
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
ws = _ws_client()
|
|
||||||
mgr._connections = [ws]
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Do it"),
|
|
||||||
_mock_run("Done"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
await run_agentic_loop("Quick", max_steps=1)
|
|
||||||
|
|
||||||
complete_event = _collected_events(ws)[-1]
|
|
||||||
assert complete_event["event"] == "agentic.task_complete"
|
|
||||||
data = complete_event["data"]
|
|
||||||
assert data["status"] == "completed"
|
|
||||||
assert data["steps_completed"] == 1
|
|
||||||
assert isinstance(data["duration_ms"], int)
|
|
||||||
assert data["duration_ms"] >= 0
|
|
||||||
assert data["summary"]
|
|
||||||
|
|
||||||
|
|
||||||
class TestAdaptationBroadcast:
|
|
||||||
"""Adapted steps emit step_adapted events."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_adapted_step_broadcasts_step_adapted(self):
|
|
||||||
"""A failed-then-adapted step emits agentic.step_adapted."""
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
ws = _ws_client()
|
|
||||||
mgr._connections = [ws]
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Risky step"),
|
|
||||||
Exception("disk full"),
|
|
||||||
_mock_run("Used /tmp instead"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
result = await run_agentic_loop("Adapt test", max_steps=1)
|
|
||||||
|
|
||||||
events = _collected_events(ws)
|
|
||||||
event_names = [e["event"] for e in events]
|
|
||||||
assert "agentic.step_adapted" in event_names
|
|
||||||
|
|
||||||
adapted = next(e for e in events if e["event"] == "agentic.step_adapted")
|
|
||||||
assert adapted["data"]["error"] == "disk full"
|
|
||||||
assert adapted["data"]["adaptation"]
|
|
||||||
assert result.steps[0].status == "adapted"
|
|
||||||
|
|
||||||
|
|
||||||
class TestMultipleClients:
|
|
||||||
"""All connected clients receive every broadcast."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_two_clients_receive_all_events(self):
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
ws1 = _ws_client()
|
|
||||||
ws2 = _ws_client()
|
|
||||||
mgr._connections = [ws1, ws2]
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Step A"),
|
|
||||||
_mock_run("A done"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
await run_agentic_loop("Multi-client", max_steps=1)
|
|
||||||
|
|
||||||
events1 = _collected_events(ws1)
|
|
||||||
events2 = _collected_events(ws2)
|
|
||||||
assert len(events1) == len(events2) == 3 # plan + step + complete
|
|
||||||
assert [e["event"] for e in events1] == [e["event"] for e in events2]
|
|
||||||
|
|
||||||
|
|
||||||
class TestEventHistory:
|
|
||||||
"""Broadcasts are recorded in ws_manager event history."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_events_appear_in_history(self):
|
|
||||||
mgr = WebSocketManager()
|
|
||||||
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Only"),
|
|
||||||
_mock_run("Done"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
|
|
||||||
):
|
|
||||||
await run_agentic_loop("History test", max_steps=1)
|
|
||||||
|
|
||||||
history_events = [e.event for e in mgr.event_history]
|
|
||||||
assert "agentic.plan_ready" in history_events
|
|
||||||
assert "agentic.step_complete" in history_events
|
|
||||||
assert "agentic.task_complete" in history_events
|
|
||||||
|
|
||||||
|
|
||||||
class TestBroadcastGracefulDegradation:
|
|
||||||
"""Loop completes even when ws_manager is unavailable."""
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
|
||||||
async def test_loop_succeeds_when_broadcast_fails(self):
|
|
||||||
"""ImportError from ws_manager doesn't crash the loop."""
|
|
||||||
mock_agent = MagicMock()
|
|
||||||
mock_agent.run = MagicMock(
|
|
||||||
side_effect=[
|
|
||||||
_mock_run("1. Do it"),
|
|
||||||
_mock_run("Done"),
|
|
||||||
]
|
|
||||||
)
|
|
||||||
|
|
||||||
with (
|
|
||||||
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
|
|
||||||
patch(
|
|
||||||
"infrastructure.ws_manager.handler.ws_manager",
|
|
||||||
new_callable=lambda: MagicMock,
|
|
||||||
) as broken_mgr,
|
|
||||||
):
|
|
||||||
broken_mgr.broadcast = AsyncMock(side_effect=RuntimeError("ws down"))
|
|
||||||
result = await run_agentic_loop("Resilient task", max_steps=1)
|
|
||||||
|
|
||||||
assert result.status == "completed"
|
|
||||||
assert len(result.steps) == 1
|
|
||||||
@@ -104,6 +104,46 @@ class TestGetLoopAgent:
|
|||||||
finally:
|
finally:
|
||||||
al._loop_agent = saved
|
al._loop_agent = saved
|
||||||
|
|
||||||
|
def test_thread_safe_creation(self):
|
||||||
|
"""Concurrent calls must only create one agent (thread-safety)."""
|
||||||
|
import threading
|
||||||
|
|
||||||
|
import timmy.agentic_loop as al
|
||||||
|
|
||||||
|
saved = al._loop_agent
|
||||||
|
try:
|
||||||
|
al._loop_agent = None
|
||||||
|
mock_agent = MagicMock()
|
||||||
|
call_count = 0
|
||||||
|
barrier = threading.Barrier(4)
|
||||||
|
|
||||||
|
original_create = MagicMock(return_value=mock_agent)
|
||||||
|
|
||||||
|
def slow_create():
|
||||||
|
nonlocal call_count
|
||||||
|
call_count += 1
|
||||||
|
return original_create()
|
||||||
|
|
||||||
|
results = [None] * 4
|
||||||
|
|
||||||
|
def worker(idx):
|
||||||
|
barrier.wait()
|
||||||
|
results[idx] = al._get_loop_agent()
|
||||||
|
|
||||||
|
with patch("timmy.agent.create_timmy", side_effect=slow_create):
|
||||||
|
threads = [threading.Thread(target=worker, args=(i,)) for i in range(4)]
|
||||||
|
for t in threads:
|
||||||
|
t.start()
|
||||||
|
for t in threads:
|
||||||
|
t.join()
|
||||||
|
|
||||||
|
# All threads got the same agent
|
||||||
|
assert all(r is mock_agent for r in results)
|
||||||
|
# create_timmy called exactly once
|
||||||
|
assert call_count == 1
|
||||||
|
finally:
|
||||||
|
al._loop_agent = saved
|
||||||
|
|
||||||
|
|
||||||
# ── _broadcast_progress ──────────────────────────────────────────────────────
|
# ── _broadcast_progress ──────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
|||||||
@@ -99,19 +99,26 @@ class TestKeywordOverlap:
|
|||||||
|
|
||||||
|
|
||||||
class TestEmbedText:
|
class TestEmbedText:
|
||||||
|
def setup_method(self):
|
||||||
|
self._saved_model = emb.EMBEDDING_MODEL
|
||||||
|
emb.EMBEDDING_MODEL = None
|
||||||
|
|
||||||
|
def teardown_method(self):
|
||||||
|
emb.EMBEDDING_MODEL = self._saved_model
|
||||||
|
|
||||||
def test_uses_fallback_when_model_disabled(self):
|
def test_uses_fallback_when_model_disabled(self):
|
||||||
with patch.object(emb, "_get_embedding_model", return_value=False):
|
emb.EMBEDDING_MODEL = False
|
||||||
vec = embed_text("test")
|
vec = embed_text("test")
|
||||||
assert len(vec) == 128 # hash fallback dimension
|
assert len(vec) == 128 # hash fallback dimension
|
||||||
|
|
||||||
def test_uses_model_when_available(self):
|
def test_uses_model_when_available(self):
|
||||||
mock_encoding = MagicMock()
|
import numpy as np
|
||||||
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
|
|
||||||
mock_model = MagicMock()
|
|
||||||
mock_model.encode.return_value = mock_encoding
|
|
||||||
|
|
||||||
with patch.object(emb, "_get_embedding_model", return_value=mock_model):
|
mock_model = MagicMock()
|
||||||
result = embed_text("test")
|
mock_model.encode.return_value = np.array([0.1, 0.2, 0.3])
|
||||||
|
emb.EMBEDDING_MODEL = mock_model
|
||||||
|
|
||||||
|
result = embed_text("test")
|
||||||
assert result == pytest.approx([0.1, 0.2, 0.3])
|
assert result == pytest.approx([0.1, 0.2, 0.3])
|
||||||
mock_model.encode.assert_called_once_with("test")
|
mock_model.encode.assert_called_once_with("test")
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user