Compare commits

..

6 Commits

Author SHA1 Message Date
kimi
9ec82ab6ad fix: add integration tests for agentic loop WebSocket broadcasts
Verify that plan_ready, step_complete, step_adapted, and task_complete
events flow through the real WebSocketManager to connected WS clients.
Also tests error resilience when WS disconnects mid-loop.

Fixes #445

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 14:29:25 -04:00
de089cec7f [loop-cycle-524] fix: remove numpy test dependency in test_memory_embeddings (#451) 2026-03-19 14:22:13 -04:00
3590c1689e fix: make _get_loop_agent singleton thread-safe (#449)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:18:27 -04:00
2161c32ae8 fix: add unit tests for agentic_loop.py (#421) (#447)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:13:50 -04:00
98b1142820 [loop-cycle-522] test: add unit tests for agentic_loop.py (#421) (#441) 2026-03-19 14:10:16 -04:00
1d79a36bd8 fix: add unit tests for memory/embeddings.py (#437)
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 11:12:46 -04:00
5 changed files with 1057 additions and 59 deletions

View File

@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio
import logging
import re
import threading
import time
import uuid
from collections.abc import Callable
@@ -59,6 +60,7 @@ class AgenticResult:
# ---------------------------------------------------------------------------
_loop_agent = None
_loop_agent_lock = threading.Lock()
def _get_loop_agent():
@@ -69,9 +71,11 @@ def _get_loop_agent():
"""
global _loop_agent
if _loop_agent is None:
from timmy.agent import create_timmy
with _loop_agent_lock:
if _loop_agent is None:
from timmy.agent import create_timmy
_loop_agent = create_timmy()
_loop_agent = create_timmy()
return _loop_agent

View File

@@ -0,0 +1,386 @@
"""Tests for timmy.agentic_loop — multi-step task execution engine."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.agentic_loop import (
AgenticResult,
AgenticStep,
_parse_steps,
)
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
class TestAgenticStep:
"""Unit tests for the AgenticStep dataclass."""
def test_creation(self):
step = AgenticStep(
step_num=1,
description="Do thing",
result="Done",
status="completed",
duration_ms=42,
)
assert step.step_num == 1
assert step.description == "Do thing"
assert step.result == "Done"
assert step.status == "completed"
assert step.duration_ms == 42
def test_failed_status(self):
step = AgenticStep(
step_num=2, description="Bad step", result="Error", status="failed", duration_ms=10
)
assert step.status == "failed"
def test_adapted_status(self):
step = AgenticStep(
step_num=3, description="Retried", result="OK", status="adapted", duration_ms=100
)
assert step.status == "adapted"
class TestAgenticResult:
"""Unit tests for the AgenticResult dataclass."""
def test_defaults(self):
result = AgenticResult(task_id="abc", task="Test", summary="Done")
assert result.steps == []
assert result.status == "completed"
assert result.total_duration_ms == 0
def test_with_steps(self):
s = AgenticStep(step_num=1, description="A", result="B", status="completed", duration_ms=5)
result = AgenticResult(task_id="x", task="T", summary="S", steps=[s])
assert len(result.steps) == 1
# ---------------------------------------------------------------------------
# _parse_steps — pure function, highly testable
# ---------------------------------------------------------------------------
class TestParseSteps:
"""Unit tests for the plan parser."""
def test_numbered_with_dots(self):
text = "1. First step\n2. Second step\n3. Third step"
steps = _parse_steps(text)
assert steps == ["First step", "Second step", "Third step"]
def test_numbered_with_parens(self):
text = "1) Do this\n2) Do that"
steps = _parse_steps(text)
assert steps == ["Do this", "Do that"]
def test_mixed_numbering(self):
text = "1. Step one\n2) Step two\n3. Step three"
steps = _parse_steps(text)
assert len(steps) == 3
def test_indented_steps(self):
text = " 1. Indented step\n 2. Also indented"
steps = _parse_steps(text)
assert len(steps) == 2
assert steps[0] == "Indented step"
def test_no_numbered_steps_fallback(self):
text = "Do this first\nThen do that\nFinally wrap up"
steps = _parse_steps(text)
assert len(steps) == 3
assert steps[0] == "Do this first"
def test_empty_string(self):
steps = _parse_steps("")
assert steps == []
def test_blank_lines_ignored_in_fallback(self):
text = "Step A\n\n\nStep B\n"
steps = _parse_steps(text)
assert steps == ["Step A", "Step B"]
def test_strips_whitespace(self):
text = "1. Lots of space \n2. Also spaced "
steps = _parse_steps(text)
assert steps[0] == "Lots of space"
assert steps[1] == "Also spaced"
def test_preamble_ignored_when_numbered(self):
text = "Here is the plan:\n1. Step one\n2. Step two"
steps = _parse_steps(text)
assert steps == ["Step one", "Step two"]
# ---------------------------------------------------------------------------
# _get_loop_agent — singleton pattern
# ---------------------------------------------------------------------------
class TestGetLoopAgent:
"""Tests for the agent singleton."""
def test_creates_agent_once(self):
import timmy.agentic_loop as mod
mod._loop_agent = None
mock_agent = MagicMock()
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_create:
agent = mod._get_loop_agent()
assert agent is mock_agent
mock_create.assert_called_once()
# Second call should reuse singleton
agent2 = mod._get_loop_agent()
assert agent2 is mock_agent
mock_create.assert_called_once()
mod._loop_agent = None # cleanup
def test_reuses_existing(self):
import timmy.agentic_loop as mod
sentinel = MagicMock()
mod._loop_agent = sentinel
assert mod._get_loop_agent() is sentinel
mod._loop_agent = None # cleanup
# ---------------------------------------------------------------------------
# _broadcast_progress — best-effort WebSocket broadcast
# ---------------------------------------------------------------------------
class TestBroadcastProgress:
"""Tests for the WebSocket broadcast helper."""
@pytest.mark.asyncio
async def test_successful_broadcast(self):
from timmy.agentic_loop import _broadcast_progress
mock_ws = MagicMock()
mock_ws.broadcast = AsyncMock()
mock_module = MagicMock()
mock_module.ws_manager = mock_ws
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": mock_module}):
await _broadcast_progress("test.event", {"key": "value"})
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "value"})
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
"""When ws_manager import fails, broadcast silently succeeds."""
import sys
from timmy.agentic_loop import _broadcast_progress
# Remove the module so import fails
saved = sys.modules.pop("infrastructure.ws_manager.handler", None)
try:
with patch.dict("sys.modules", {"infrastructure": None}):
# Should not raise — errors are swallowed
await _broadcast_progress("fail.event", {})
finally:
if saved is not None:
sys.modules["infrastructure.ws_manager.handler"] = saved
# ---------------------------------------------------------------------------
# run_agentic_loop — integration-style tests with mocked agent
# ---------------------------------------------------------------------------
class TestRunAgenticLoop:
"""Tests for the main agentic loop."""
@pytest.fixture(autouse=True)
def _reset_agent(self):
import timmy.agentic_loop as mod
mod._loop_agent = None
yield
mod._loop_agent = None
def _mock_agent(self, responses):
"""Create a mock agent that returns responses in sequence."""
agent = MagicMock()
run_results = []
for r in responses:
mock_result = MagicMock()
mock_result.content = r
run_results.append(mock_result)
agent.run = MagicMock(side_effect=run_results)
return agent
@pytest.mark.asyncio
async def test_successful_two_step_task(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. Step one\n2. Step two", # planning
"Step one done", # execution step 1
"Step two done", # execution step 2
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Test task", max_steps=5)
assert result.status == "completed"
assert len(result.steps) == 2
assert result.steps[0].status == "completed"
assert result.steps[1].status == "completed"
assert result.total_duration_ms >= 0
@pytest.mark.asyncio
async def test_planning_failure(self):
from timmy.agentic_loop import run_agentic_loop
agent = MagicMock()
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
):
result = await run_agentic_loop("Broken task", max_steps=3)
assert result.status == "failed"
assert "Planning failed" in result.summary
@pytest.mark.asyncio
async def test_empty_plan(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent([""]) # empty plan
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
):
result = await run_agentic_loop("Empty plan task", max_steps=3)
assert result.status == "failed"
assert "no steps" in result.summary.lower()
@pytest.mark.asyncio
async def test_step_failure_triggers_adaptation(self):
from timmy.agentic_loop import run_agentic_loop
agent = MagicMock()
call_count = 0
def mock_run(prompt, **kwargs):
nonlocal call_count
call_count += 1
result = MagicMock()
if call_count == 1:
result.content = "1. Only step"
elif call_count == 2:
raise RuntimeError("Step failed")
else:
result.content = "Adapted successfully"
return result
agent.run = mock_run
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Failing task", max_steps=5)
assert len(result.steps) == 1
assert result.steps[0].status == "adapted"
assert "[Adapted]" in result.steps[0].description
@pytest.mark.asyncio
async def test_max_steps_truncation(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. A\n2. B\n3. C\n4. D\n5. E", # 5 steps planned
"Done A",
"Done B",
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Big task", max_steps=2)
assert result.status == "partial" # was truncated
assert len(result.steps) == 2
@pytest.mark.asyncio
async def test_on_progress_callback(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. Only step",
"Step done",
]
)
progress_calls = []
async def track_progress(desc, step_num, total):
progress_calls.append((desc, step_num, total))
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
await run_agentic_loop("Callback task", max_steps=5, on_progress=track_progress)
assert len(progress_calls) == 1
assert progress_calls[0][1] == 1 # step_num
@pytest.mark.asyncio
async def test_default_max_steps_from_settings(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(["1. Step one", "Done"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 7
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
patch("config.settings", mock_settings),
):
result = await run_agentic_loop("Settings task")
assert result.status == "completed"
@pytest.mark.asyncio
async def test_task_id_generated(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(["1. Step", "OK"])
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("ID task", max_steps=5)
assert result.task_id # non-empty
assert len(result.task_id) == 8 # uuid[:8]

View File

@@ -0,0 +1,281 @@
"""Integration tests for agentic loop WebSocket broadcasts.
Verifies that agentic loop events (plan_ready, step_complete, task_complete)
flow through the real WebSocketManager to connected clients, and that the
loop survives a WS disconnect mid-execution.
Fixes #445
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.ws_manager.handler import WebSocketManager
from timmy.agentic_loop import run_agentic_loop
def _mock_run(content: str):
"""Create a mock return value for agent.run()."""
m = MagicMock()
m.content = content
return m
def _fake_ws() -> AsyncMock:
"""Return an AsyncMock that behaves like a WebSocket connection."""
ws = AsyncMock()
ws.send_text = AsyncMock()
return ws
def _collect_events(ws_mock: AsyncMock) -> list[dict]:
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
events = []
for call in ws_mock.send_text.call_args_list:
raw = call.args[0] if call.args else call.kwargs.get("data", "")
events.append(json.loads(raw))
return events
# ---------------------------------------------------------------------------
# Integration: events reach a real WebSocketManager
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_plan_ready_event_received_by_ws_client():
"""A connected WS client receives an agentic.plan_ready event."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Gather data\n2. Summarise"),
_mock_run("Data gathered"),
_mock_run("Summary written"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Gather and summarise", max_steps=2)
events = _collect_events(ws)
plan_events = [e for e in events if e["event"] == "agentic.plan_ready"]
assert len(plan_events) == 1
assert plan_events[0]["data"]["steps"] == ["Gather data", "Summarise"]
assert plan_events[0]["data"]["total"] == 2
@pytest.mark.asyncio
async def test_step_complete_events_received_by_ws_client():
"""A connected WS client receives agentic.step_complete for each step."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A\n2. Step B"),
_mock_run("A done"),
_mock_run("B done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Do A and B", max_steps=2)
events = _collect_events(ws)
step_events = [e for e in events if e["event"] == "agentic.step_complete"]
assert len(step_events) == 2
assert step_events[0]["data"]["step"] == 1
assert step_events[1]["data"]["step"] == 2
@pytest.mark.asyncio
async def test_task_complete_event_received_by_ws_client():
"""A connected WS client receives agentic.task_complete at the end."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only step"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("One step task", max_steps=1)
events = _collect_events(ws)
task_events = [e for e in events if e["event"] == "agentic.task_complete"]
assert len(task_events) == 1
assert task_events[0]["data"]["status"] == "completed"
assert task_events[0]["data"]["task_id"] == result.task_id
@pytest.mark.asyncio
async def test_all_event_types_received_in_order():
"""Client receives plan_ready → step_complete(s) → task_complete in order."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. First\n2. Second"),
_mock_run("First done"),
_mock_run("Second done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Two steps", max_steps=2)
events = _collect_events(ws)
event_names = [e["event"] for e in events]
assert event_names == [
"agentic.plan_ready",
"agentic.step_complete",
"agentic.step_complete",
"agentic.task_complete",
]
@pytest.mark.asyncio
async def test_adapted_step_broadcasts_step_adapted():
"""When a step fails and adapts, client receives agentic.step_adapted."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Risky step"),
Exception("network error"),
_mock_run("Adapted approach worked"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Risky task", max_steps=1)
events = _collect_events(ws)
event_names = [e["event"] for e in events]
assert "agentic.step_adapted" in event_names
adapted = [e for e in events if e["event"] == "agentic.step_adapted"][0]
assert adapted["data"]["error"] == "network error"
# ---------------------------------------------------------------------------
# Resilience: WS disconnect mid-loop
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ws_disconnect_mid_loop_does_not_crash():
"""If a WS client disconnects during the loop, the loop completes."""
mgr = WebSocketManager()
ws = _fake_ws()
# First send succeeds (plan_ready), then WS dies
ws.send_text = AsyncMock(side_effect=[None, ConnectionError("gone")])
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A\n2. Step B"),
_mock_run("A done"),
_mock_run("B done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Survive disconnect", max_steps=2)
# Loop completed despite WS failure
assert result.status == "completed"
assert len(result.steps) == 2
@pytest.mark.asyncio
async def test_no_ws_connections_does_not_crash():
"""Loop completes normally when no WS clients are connected."""
mgr = WebSocketManager()
# No connections at all
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Solo step"),
_mock_run("Done alone"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("No audience", max_steps=1)
assert result.status == "completed"
assert len(result.steps) == 1
@pytest.mark.asyncio
async def test_multiple_ws_clients_all_receive_events():
"""All connected WS clients receive the same broadcast events."""
mgr = WebSocketManager()
ws1 = _fake_ws()
ws2 = _fake_ws()
mgr._connections = [ws1, ws2]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Shared step"),
_mock_run("Shared result"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Broadcast to all", max_steps=1)
events1 = _collect_events(ws1)
events2 = _collect_events(ws2)
names1 = [e["event"] for e in events1]
names2 = [e["event"] for e in events2]
assert names1 == names2
assert "agentic.plan_ready" in names1
assert "agentic.task_complete" in names1

View File

@@ -0,0 +1,319 @@
"""Unit tests for timmy.agentic_loop — agentic loop data structures, parsing, and execution."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.agentic_loop import (
AgenticResult,
AgenticStep,
_broadcast_progress,
_parse_steps,
run_agentic_loop,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestAgenticStep:
def test_fields(self):
step = AgenticStep(
step_num=1,
description="Do something",
result="Done",
status="completed",
duration_ms=42,
)
assert step.step_num == 1
assert step.description == "Do something"
assert step.result == "Done"
assert step.status == "completed"
assert step.duration_ms == 42
class TestAgenticResult:
def test_defaults(self):
r = AgenticResult(task_id="abc", task="test task", summary="ok")
assert r.steps == []
assert r.status == "completed"
assert r.total_duration_ms == 0
def test_with_steps(self):
step = AgenticStep(1, "s", "r", "completed", 10)
r = AgenticResult(task_id="x", task="t", summary="s", steps=[step])
assert len(r.steps) == 1
# ── _parse_steps ─────────────────────────────────────────────────────────────
class TestParseSteps:
def test_numbered_dot(self):
text = "1. First step\n2. Second step\n3. Third step"
assert _parse_steps(text) == ["First step", "Second step", "Third step"]
def test_numbered_paren(self):
text = "1) Alpha\n2) Beta"
assert _parse_steps(text) == ["Alpha", "Beta"]
def test_mixed_whitespace(self):
text = " 1. Indented step\n 2. Another "
result = _parse_steps(text)
assert result == ["Indented step", "Another"]
def test_fallback_plain_lines(self):
text = "Do this\nDo that\nDo the other"
assert _parse_steps(text) == ["Do this", "Do that", "Do the other"]
def test_empty_string(self):
assert _parse_steps("") == []
def test_blank_lines_skipped_in_fallback(self):
text = "line one\n\nline two\n \nline three"
assert _parse_steps(text) == ["line one", "line two", "line three"]
# ── _get_loop_agent ──────────────────────────────────────────────────────────
class TestGetLoopAgent:
def test_creates_agent_once(self):
import timmy.agentic_loop as al
saved = al._loop_agent
try:
al._loop_agent = None
mock_agent = MagicMock()
with patch("timmy.agent.create_timmy", return_value=mock_agent):
result = al._get_loop_agent()
assert result is mock_agent
# Second call returns cached
result2 = al._get_loop_agent()
assert result2 is mock_agent
finally:
al._loop_agent = saved
def test_returns_cached(self):
import timmy.agentic_loop as al
saved = al._loop_agent
try:
sentinel = object()
al._loop_agent = sentinel
assert al._get_loop_agent() is sentinel
finally:
al._loop_agent = saved
# ── _broadcast_progress ──────────────────────────────────────────────────────
class TestBroadcastProgress:
@pytest.mark.asyncio
async def test_success(self):
mock_ws = AsyncMock()
with (
patch("timmy.agentic_loop.ws_manager", mock_ws, create=True),
patch.dict(
"sys.modules",
{"infrastructure.ws_manager.handler": MagicMock(ws_manager=mock_ws)},
),
):
await _broadcast_progress("test.event", {"key": "val"})
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "val"})
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}):
# Should not raise
await _broadcast_progress("test.event", {})
# ── run_agentic_loop ─────────────────────────────────────────────────────────
def _make_mock_agent(plan_text, step_responses=None):
"""Create a mock agent whose .run returns predictable content."""
call_count = 0
def run_side_effect(prompt, *, stream=False, session_id=""):
nonlocal call_count
call_count += 1
resp = MagicMock()
if call_count == 1:
# Planning call
resp.content = plan_text
else:
idx = call_count - 2 # step index (0-based)
if step_responses and idx < len(step_responses):
val = step_responses[idx]
if isinstance(val, Exception):
raise val
resp.content = val
else:
resp.content = f"Step result {call_count}"
return resp
agent = MagicMock()
agent.run = MagicMock(side_effect=run_side_effect)
return agent
@pytest.fixture
def _patch_broadcast():
with patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
yield
@pytest.fixture
def _patch_clean_response():
with patch("timmy.session._clean_response", side_effect=lambda x: x):
yield
class TestRunAgenticLoop:
@pytest.mark.asyncio
async def test_successful_execution(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. Step A\n2. Step B", ["Result A", "Result B"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop.settings", mock_settings, create=True),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
assert result.status == "completed"
assert len(result.steps) == 2
assert result.steps[0].status == "completed"
assert result.steps[0].description == "Step A"
assert result.total_duration_ms >= 0
@pytest.mark.asyncio
async def test_planning_failure(self, _patch_broadcast):
agent = MagicMock()
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
mock_settings = MagicMock()
mock_settings.max_agent_steps = 5
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=3)
assert result.status == "failed"
assert "Planning failed" in result.summary
@pytest.mark.asyncio
async def test_empty_plan(self, _patch_broadcast):
agent = _make_mock_agent("")
mock_settings = MagicMock()
mock_settings.max_agent_steps = 5
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=3)
assert result.status == "failed"
assert "no steps" in result.summary.lower()
@pytest.mark.asyncio
async def test_step_failure_triggers_adaptation(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent(
"1. Do X\n2. Do Y",
[RuntimeError("oops"), "Adapted result", "Y done"],
)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
# Step 1 should be adapted, step 2 completed
statuses = [s.status for s in result.steps]
assert "adapted" in statuses
@pytest.mark.asyncio
async def test_truncation_marks_partial(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent(
"1. A\n2. B\n3. C\n4. D\n5. E",
["r1", "r2"],
)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=2)
assert result.status == "partial"
@pytest.mark.asyncio
async def test_on_progress_callback(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. Only step", ["done"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
callback = AsyncMock()
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5, on_progress=callback)
callback.assert_awaited_once_with("Only step", 1, 1)
assert result.status == "completed"
@pytest.mark.asyncio
async def test_default_max_steps_from_settings(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. S1", ["r1"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 3
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff") # max_steps=0 → from settings
assert result.status == "completed"
@pytest.mark.asyncio
async def test_failed_step_and_failed_adaptation(self, _patch_broadcast, _patch_clean_response):
"""When both step and adaptation fail, step is marked failed."""
call_count = 0
def run_side_effect(prompt, *, stream=False, session_id=""):
nonlocal call_count
call_count += 1
if call_count == 1:
resp = MagicMock()
resp.content = "1. Only step"
return resp
# Both step execution and adaptation fail
raise RuntimeError("everything broken")
agent = MagicMock()
agent.run = MagicMock(side_effect=run_side_effect)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
assert result.steps[0].status == "failed"
assert "Failed" in result.steps[0].result
assert result.status == "partial"

View File

@@ -13,44 +13,46 @@ from timmy.memory.embeddings import (
embed_text,
)
# ── _simple_hash_embedding ──────────────────────────────────────────────────
# ── _simple_hash_embedding ──────────────────────────────────────────────────
class TestSimpleHashEmbedding:
"""Tests for the deterministic hash-based fallback embedding."""
def test_returns_128_floats(self):
def test_returns_128_dim_vector(self):
vec = _simple_hash_embedding("hello world")
assert len(vec) == 128
assert all(isinstance(x, float) for x in vec)
def test_deterministic(self):
"""Same input always produces the same vector."""
assert _simple_hash_embedding("test") == _simple_hash_embedding("test")
def test_normalized(self):
"""Output vector has unit magnitude."""
vec = _simple_hash_embedding("some text for testing")
vec = _simple_hash_embedding("some text for embedding")
mag = math.sqrt(sum(x * x for x in vec))
assert mag == pytest.approx(1.0, abs=1e-6)
def test_deterministic(self):
a = _simple_hash_embedding("same input")
b = _simple_hash_embedding("same input")
assert a == b
def test_different_texts_differ(self):
a = _simple_hash_embedding("hello world")
b = _simple_hash_embedding("goodbye moon")
assert a != b
def test_empty_string(self):
"""Empty string doesn't crash — returns a zero-ish vector."""
vec = _simple_hash_embedding("")
assert len(vec) == 128
# All zeros normalised stays zero (mag fallback to 1.0)
assert all(x == 0.0 for x in vec)
def test_different_inputs_differ(self):
a = _simple_hash_embedding("alpha")
b = _simple_hash_embedding("beta")
assert a != b
def test_long_text_truncates_at_50_words(self):
"""Words beyond 50 should not change the result."""
short = " ".join(f"word{i}" for i in range(50))
long = short + " extra1 extra2 extra3"
assert _simple_hash_embedding(short) == _simple_hash_embedding(long)
# ── cosine_similarity ────────────────────────────────────────────────────────
class TestCosineSimilarity:
"""Tests for cosine similarity calculation."""
def test_identical_vectors(self):
v = [1.0, 2.0, 3.0]
assert cosine_similarity(v, v) == pytest.approx(1.0)
@@ -69,86 +71,92 @@ class TestCosineSimilarity:
assert cosine_similarity([0.0, 0.0], [1.0, 2.0]) == 0.0
assert cosine_similarity([1.0, 2.0], [0.0, 0.0]) == 0.0
def test_different_length_uses_zip(self):
"""zip(strict=False) truncates to shortest — verify no crash."""
result = cosine_similarity([1.0, 0.0], [1.0, 0.0, 9.9])
# mag_b includes the extra element, so result < 1.0
assert isinstance(result, float)
def test_both_zero_vectors(self):
assert cosine_similarity([0.0], [0.0]) == 0.0
# ── _keyword_overlap ─────────────────────────────────────────────────────────
class TestKeywordOverlap:
"""Tests for keyword overlap scoring."""
def test_full_overlap(self):
assert _keyword_overlap("hello world", "hello world extra") == pytest.approx(1.0)
assert _keyword_overlap("hello world", "hello world") == pytest.approx(1.0)
def test_partial_overlap(self):
assert _keyword_overlap("hello world", "hello there") == pytest.approx(0.5)
assert _keyword_overlap("hello world", "hello moon") == pytest.approx(0.5)
def test_no_overlap(self):
assert _keyword_overlap("alpha", "beta") == pytest.approx(0.0)
assert _keyword_overlap("hello", "goodbye") == pytest.approx(0.0)
def test_empty_query(self):
assert _keyword_overlap("", "some content") == 0.0
assert _keyword_overlap("", "anything") == 0.0
def test_case_insensitive(self):
assert _keyword_overlap("Hello", "hello world") == pytest.approx(1.0)
assert _keyword_overlap("Hello World", "hello world") == pytest.approx(1.0)
# ── embed_text ───────────────────────────────────────────────────────────────
class TestEmbedText:
"""Tests for the main embed_text entry point."""
def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def test_uses_fallback_when_model_false(self):
"""When _get_embedding_model returns False, use hash fallback."""
with patch.object(emb, "EMBEDDING_MODEL", False):
with patch.object(emb, "_get_embedding_model", return_value=False):
vec = embed_text("hello")
assert len(vec) == 128
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_uses_fallback_when_model_disabled(self):
emb.EMBEDDING_MODEL = False
vec = embed_text("test")
assert len(vec) == 128 # hash fallback dimension
def test_uses_model_when_available(self):
"""When a real model is loaded, call model.encode()."""
mock_encoding = MagicMock()
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
mock_model = MagicMock()
mock_model.encode.return_value = MagicMock(tolist=MagicMock(return_value=[0.1, 0.2]))
with patch.object(emb, "_get_embedding_model", return_value=mock_model):
vec = embed_text("hello")
assert vec == [0.1, 0.2]
mock_model.encode.assert_called_once_with("hello")
mock_model.encode.return_value = mock_encoding
emb.EMBEDDING_MODEL = mock_model
result = embed_text("test")
assert result == pytest.approx([0.1, 0.2, 0.3])
mock_model.encode.assert_called_once_with("test")
# ── _get_embedding_model ─────────────────────────────────────────────────────
class TestGetEmbeddingModel:
"""Tests for lazy model loading."""
def setup_method(self):
"""Reset global state before each test."""
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_skip_embeddings_setting(self):
"""When settings.timmy_skip_embeddings is True, model is set to False."""
mock_settings = MagicMock()
mock_settings.timmy_skip_embeddings = True
with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model()
assert result is False
assert result is False
def test_sentence_transformers_import_error(self):
"""When sentence-transformers is missing, falls back to False."""
def test_fallback_when_transformers_missing(self):
mock_settings = MagicMock()
mock_settings.timmy_skip_embeddings = False
with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
with patch.dict("sys.modules", {"sentence_transformers": None}):
emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model()
assert result is False
with patch.dict(
"sys.modules",
{
"config": MagicMock(settings=mock_settings),
"sentence_transformers": None,
},
):
emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model()
assert result is False
def teardown_method(self):
emb.EMBEDDING_MODEL = None
def test_returns_cached_model(self):
sentinel = object()
emb.EMBEDDING_MODEL = sentinel
assert emb._get_embedding_model() is sentinel