Compare commits

...

6 Commits

Author SHA1 Message Date
kimi
9ec82ab6ad fix: add integration tests for agentic loop WebSocket broadcasts
All checks were successful
Tests / lint (pull_request) Successful in 14s
Tests / test (pull_request) Successful in 1m16s
Verify that plan_ready, step_complete, step_adapted, and task_complete
events flow through the real WebSocketManager to connected WS clients.
Also tests error resilience when WS disconnects mid-loop.

Fixes #445

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-19 14:29:25 -04:00
de089cec7f [loop-cycle-524] fix: remove numpy test dependency in test_memory_embeddings (#451)
All checks were successful
Tests / lint (push) Successful in 2s
Tests / test (push) Successful in 1m38s
2026-03-19 14:22:13 -04:00
3590c1689e fix: make _get_loop_agent singleton thread-safe (#449)
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 1m20s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:18:27 -04:00
2161c32ae8 fix: add unit tests for agentic_loop.py (#421) (#447)
Some checks failed
Tests / lint (push) Successful in 2s
Tests / test (push) Failing after 1m12s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 14:13:50 -04:00
98b1142820 [loop-cycle-522] test: add unit tests for agentic_loop.py (#421) (#441)
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 1m5s
2026-03-19 14:10:16 -04:00
1d79a36bd8 fix: add unit tests for memory/embeddings.py (#437)
Some checks failed
Tests / lint (push) Successful in 3s
Tests / test (push) Failing after 1m5s
Co-authored-by: Kimi Agent <kimi@timmy.local>
Co-committed-by: Kimi Agent <kimi@timmy.local>
2026-03-19 11:12:46 -04:00
5 changed files with 1154 additions and 2 deletions

View File

@@ -18,6 +18,7 @@ from __future__ import annotations
import asyncio import asyncio
import logging import logging
import re import re
import threading
import time import time
import uuid import uuid
from collections.abc import Callable from collections.abc import Callable
@@ -59,6 +60,7 @@ class AgenticResult:
# --------------------------------------------------------------------------- # ---------------------------------------------------------------------------
_loop_agent = None _loop_agent = None
_loop_agent_lock = threading.Lock()
def _get_loop_agent(): def _get_loop_agent():
@@ -69,9 +71,11 @@ def _get_loop_agent():
""" """
global _loop_agent global _loop_agent
if _loop_agent is None: if _loop_agent is None:
from timmy.agent import create_timmy with _loop_agent_lock:
if _loop_agent is None:
from timmy.agent import create_timmy
_loop_agent = create_timmy() _loop_agent = create_timmy()
return _loop_agent return _loop_agent

View File

@@ -0,0 +1,386 @@
"""Tests for timmy.agentic_loop — multi-step task execution engine."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.agentic_loop import (
AgenticResult,
AgenticStep,
_parse_steps,
)
# ---------------------------------------------------------------------------
# Data structures
# ---------------------------------------------------------------------------
class TestAgenticStep:
"""Unit tests for the AgenticStep dataclass."""
def test_creation(self):
step = AgenticStep(
step_num=1,
description="Do thing",
result="Done",
status="completed",
duration_ms=42,
)
assert step.step_num == 1
assert step.description == "Do thing"
assert step.result == "Done"
assert step.status == "completed"
assert step.duration_ms == 42
def test_failed_status(self):
step = AgenticStep(
step_num=2, description="Bad step", result="Error", status="failed", duration_ms=10
)
assert step.status == "failed"
def test_adapted_status(self):
step = AgenticStep(
step_num=3, description="Retried", result="OK", status="adapted", duration_ms=100
)
assert step.status == "adapted"
class TestAgenticResult:
"""Unit tests for the AgenticResult dataclass."""
def test_defaults(self):
result = AgenticResult(task_id="abc", task="Test", summary="Done")
assert result.steps == []
assert result.status == "completed"
assert result.total_duration_ms == 0
def test_with_steps(self):
s = AgenticStep(step_num=1, description="A", result="B", status="completed", duration_ms=5)
result = AgenticResult(task_id="x", task="T", summary="S", steps=[s])
assert len(result.steps) == 1
# ---------------------------------------------------------------------------
# _parse_steps — pure function, highly testable
# ---------------------------------------------------------------------------
class TestParseSteps:
"""Unit tests for the plan parser."""
def test_numbered_with_dots(self):
text = "1. First step\n2. Second step\n3. Third step"
steps = _parse_steps(text)
assert steps == ["First step", "Second step", "Third step"]
def test_numbered_with_parens(self):
text = "1) Do this\n2) Do that"
steps = _parse_steps(text)
assert steps == ["Do this", "Do that"]
def test_mixed_numbering(self):
text = "1. Step one\n2) Step two\n3. Step three"
steps = _parse_steps(text)
assert len(steps) == 3
def test_indented_steps(self):
text = " 1. Indented step\n 2. Also indented"
steps = _parse_steps(text)
assert len(steps) == 2
assert steps[0] == "Indented step"
def test_no_numbered_steps_fallback(self):
text = "Do this first\nThen do that\nFinally wrap up"
steps = _parse_steps(text)
assert len(steps) == 3
assert steps[0] == "Do this first"
def test_empty_string(self):
steps = _parse_steps("")
assert steps == []
def test_blank_lines_ignored_in_fallback(self):
text = "Step A\n\n\nStep B\n"
steps = _parse_steps(text)
assert steps == ["Step A", "Step B"]
def test_strips_whitespace(self):
text = "1. Lots of space \n2. Also spaced "
steps = _parse_steps(text)
assert steps[0] == "Lots of space"
assert steps[1] == "Also spaced"
def test_preamble_ignored_when_numbered(self):
text = "Here is the plan:\n1. Step one\n2. Step two"
steps = _parse_steps(text)
assert steps == ["Step one", "Step two"]
# ---------------------------------------------------------------------------
# _get_loop_agent — singleton pattern
# ---------------------------------------------------------------------------
class TestGetLoopAgent:
"""Tests for the agent singleton."""
def test_creates_agent_once(self):
import timmy.agentic_loop as mod
mod._loop_agent = None
mock_agent = MagicMock()
with patch("timmy.agent.create_timmy", return_value=mock_agent) as mock_create:
agent = mod._get_loop_agent()
assert agent is mock_agent
mock_create.assert_called_once()
# Second call should reuse singleton
agent2 = mod._get_loop_agent()
assert agent2 is mock_agent
mock_create.assert_called_once()
mod._loop_agent = None # cleanup
def test_reuses_existing(self):
import timmy.agentic_loop as mod
sentinel = MagicMock()
mod._loop_agent = sentinel
assert mod._get_loop_agent() is sentinel
mod._loop_agent = None # cleanup
# ---------------------------------------------------------------------------
# _broadcast_progress — best-effort WebSocket broadcast
# ---------------------------------------------------------------------------
class TestBroadcastProgress:
"""Tests for the WebSocket broadcast helper."""
@pytest.mark.asyncio
async def test_successful_broadcast(self):
from timmy.agentic_loop import _broadcast_progress
mock_ws = MagicMock()
mock_ws.broadcast = AsyncMock()
mock_module = MagicMock()
mock_module.ws_manager = mock_ws
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": mock_module}):
await _broadcast_progress("test.event", {"key": "value"})
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "value"})
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
"""When ws_manager import fails, broadcast silently succeeds."""
import sys
from timmy.agentic_loop import _broadcast_progress
# Remove the module so import fails
saved = sys.modules.pop("infrastructure.ws_manager.handler", None)
try:
with patch.dict("sys.modules", {"infrastructure": None}):
# Should not raise — errors are swallowed
await _broadcast_progress("fail.event", {})
finally:
if saved is not None:
sys.modules["infrastructure.ws_manager.handler"] = saved
# ---------------------------------------------------------------------------
# run_agentic_loop — integration-style tests with mocked agent
# ---------------------------------------------------------------------------
class TestRunAgenticLoop:
"""Tests for the main agentic loop."""
@pytest.fixture(autouse=True)
def _reset_agent(self):
import timmy.agentic_loop as mod
mod._loop_agent = None
yield
mod._loop_agent = None
def _mock_agent(self, responses):
"""Create a mock agent that returns responses in sequence."""
agent = MagicMock()
run_results = []
for r in responses:
mock_result = MagicMock()
mock_result.content = r
run_results.append(mock_result)
agent.run = MagicMock(side_effect=run_results)
return agent
@pytest.mark.asyncio
async def test_successful_two_step_task(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. Step one\n2. Step two", # planning
"Step one done", # execution step 1
"Step two done", # execution step 2
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Test task", max_steps=5)
assert result.status == "completed"
assert len(result.steps) == 2
assert result.steps[0].status == "completed"
assert result.steps[1].status == "completed"
assert result.total_duration_ms >= 0
@pytest.mark.asyncio
async def test_planning_failure(self):
from timmy.agentic_loop import run_agentic_loop
agent = MagicMock()
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
):
result = await run_agentic_loop("Broken task", max_steps=3)
assert result.status == "failed"
assert "Planning failed" in result.summary
@pytest.mark.asyncio
async def test_empty_plan(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent([""]) # empty plan
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
):
result = await run_agentic_loop("Empty plan task", max_steps=3)
assert result.status == "failed"
assert "no steps" in result.summary.lower()
@pytest.mark.asyncio
async def test_step_failure_triggers_adaptation(self):
from timmy.agentic_loop import run_agentic_loop
agent = MagicMock()
call_count = 0
def mock_run(prompt, **kwargs):
nonlocal call_count
call_count += 1
result = MagicMock()
if call_count == 1:
result.content = "1. Only step"
elif call_count == 2:
raise RuntimeError("Step failed")
else:
result.content = "Adapted successfully"
return result
agent.run = mock_run
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Failing task", max_steps=5)
assert len(result.steps) == 1
assert result.steps[0].status == "adapted"
assert "[Adapted]" in result.steps[0].description
@pytest.mark.asyncio
async def test_max_steps_truncation(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. A\n2. B\n3. C\n4. D\n5. E", # 5 steps planned
"Done A",
"Done B",
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("Big task", max_steps=2)
assert result.status == "partial" # was truncated
assert len(result.steps) == 2
@pytest.mark.asyncio
async def test_on_progress_callback(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(
[
"1. Only step",
"Step done",
]
)
progress_calls = []
async def track_progress(desc, step_num, total):
progress_calls.append((desc, step_num, total))
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
await run_agentic_loop("Callback task", max_steps=5, on_progress=track_progress)
assert len(progress_calls) == 1
assert progress_calls[0][1] == 1 # step_num
@pytest.mark.asyncio
async def test_default_max_steps_from_settings(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(["1. Step one", "Done"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 7
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
patch("config.settings", mock_settings),
):
result = await run_agentic_loop("Settings task")
assert result.status == "completed"
@pytest.mark.asyncio
async def test_task_id_generated(self):
from timmy.agentic_loop import run_agentic_loop
agent = self._mock_agent(["1. Step", "OK"])
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock),
patch("timmy.session._clean_response", side_effect=lambda x: x),
):
result = await run_agentic_loop("ID task", max_steps=5)
assert result.task_id # non-empty
assert len(result.task_id) == 8 # uuid[:8]

View File

@@ -0,0 +1,281 @@
"""Integration tests for agentic loop WebSocket broadcasts.
Verifies that agentic loop events (plan_ready, step_complete, task_complete)
flow through the real WebSocketManager to connected clients, and that the
loop survives a WS disconnect mid-execution.
Fixes #445
"""
import json
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from infrastructure.ws_manager.handler import WebSocketManager
from timmy.agentic_loop import run_agentic_loop
def _mock_run(content: str):
"""Create a mock return value for agent.run()."""
m = MagicMock()
m.content = content
return m
def _fake_ws() -> AsyncMock:
"""Return an AsyncMock that behaves like a WebSocket connection."""
ws = AsyncMock()
ws.send_text = AsyncMock()
return ws
def _collect_events(ws_mock: AsyncMock) -> list[dict]:
"""Extract parsed JSON events from a mock WebSocket's send_text calls."""
events = []
for call in ws_mock.send_text.call_args_list:
raw = call.args[0] if call.args else call.kwargs.get("data", "")
events.append(json.loads(raw))
return events
# ---------------------------------------------------------------------------
# Integration: events reach a real WebSocketManager
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_plan_ready_event_received_by_ws_client():
"""A connected WS client receives an agentic.plan_ready event."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Gather data\n2. Summarise"),
_mock_run("Data gathered"),
_mock_run("Summary written"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Gather and summarise", max_steps=2)
events = _collect_events(ws)
plan_events = [e for e in events if e["event"] == "agentic.plan_ready"]
assert len(plan_events) == 1
assert plan_events[0]["data"]["steps"] == ["Gather data", "Summarise"]
assert plan_events[0]["data"]["total"] == 2
@pytest.mark.asyncio
async def test_step_complete_events_received_by_ws_client():
"""A connected WS client receives agentic.step_complete for each step."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A\n2. Step B"),
_mock_run("A done"),
_mock_run("B done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Do A and B", max_steps=2)
events = _collect_events(ws)
step_events = [e for e in events if e["event"] == "agentic.step_complete"]
assert len(step_events) == 2
assert step_events[0]["data"]["step"] == 1
assert step_events[1]["data"]["step"] == 2
@pytest.mark.asyncio
async def test_task_complete_event_received_by_ws_client():
"""A connected WS client receives agentic.task_complete at the end."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Only step"),
_mock_run("Done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("One step task", max_steps=1)
events = _collect_events(ws)
task_events = [e for e in events if e["event"] == "agentic.task_complete"]
assert len(task_events) == 1
assert task_events[0]["data"]["status"] == "completed"
assert task_events[0]["data"]["task_id"] == result.task_id
@pytest.mark.asyncio
async def test_all_event_types_received_in_order():
"""Client receives plan_ready → step_complete(s) → task_complete in order."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. First\n2. Second"),
_mock_run("First done"),
_mock_run("Second done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Two steps", max_steps=2)
events = _collect_events(ws)
event_names = [e["event"] for e in events]
assert event_names == [
"agentic.plan_ready",
"agentic.step_complete",
"agentic.step_complete",
"agentic.task_complete",
]
@pytest.mark.asyncio
async def test_adapted_step_broadcasts_step_adapted():
"""When a step fails and adapts, client receives agentic.step_adapted."""
mgr = WebSocketManager()
ws = _fake_ws()
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Risky step"),
Exception("network error"),
_mock_run("Adapted approach worked"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Risky task", max_steps=1)
events = _collect_events(ws)
event_names = [e["event"] for e in events]
assert "agentic.step_adapted" in event_names
adapted = [e for e in events if e["event"] == "agentic.step_adapted"][0]
assert adapted["data"]["error"] == "network error"
# ---------------------------------------------------------------------------
# Resilience: WS disconnect mid-loop
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_ws_disconnect_mid_loop_does_not_crash():
"""If a WS client disconnects during the loop, the loop completes."""
mgr = WebSocketManager()
ws = _fake_ws()
# First send succeeds (plan_ready), then WS dies
ws.send_text = AsyncMock(side_effect=[None, ConnectionError("gone")])
mgr._connections = [ws]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Step A\n2. Step B"),
_mock_run("A done"),
_mock_run("B done"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("Survive disconnect", max_steps=2)
# Loop completed despite WS failure
assert result.status == "completed"
assert len(result.steps) == 2
@pytest.mark.asyncio
async def test_no_ws_connections_does_not_crash():
"""Loop completes normally when no WS clients are connected."""
mgr = WebSocketManager()
# No connections at all
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Solo step"),
_mock_run("Done alone"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
result = await run_agentic_loop("No audience", max_steps=1)
assert result.status == "completed"
assert len(result.steps) == 1
@pytest.mark.asyncio
async def test_multiple_ws_clients_all_receive_events():
"""All connected WS clients receive the same broadcast events."""
mgr = WebSocketManager()
ws1 = _fake_ws()
ws2 = _fake_ws()
mgr._connections = [ws1, ws2]
mock_agent = MagicMock()
mock_agent.run = MagicMock(
side_effect=[
_mock_run("1. Shared step"),
_mock_run("Shared result"),
]
)
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=mock_agent),
patch("infrastructure.ws_manager.handler.ws_manager", mgr),
):
await run_agentic_loop("Broadcast to all", max_steps=1)
events1 = _collect_events(ws1)
events2 = _collect_events(ws2)
names1 = [e["event"] for e in events1]
names2 = [e["event"] for e in events2]
assert names1 == names2
assert "agentic.plan_ready" in names1
assert "agentic.task_complete" in names1

View File

@@ -0,0 +1,319 @@
"""Unit tests for timmy.agentic_loop — agentic loop data structures, parsing, and execution."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.agentic_loop import (
AgenticResult,
AgenticStep,
_broadcast_progress,
_parse_steps,
run_agentic_loop,
)
# ── Data structures ──────────────────────────────────────────────────────────
class TestAgenticStep:
def test_fields(self):
step = AgenticStep(
step_num=1,
description="Do something",
result="Done",
status="completed",
duration_ms=42,
)
assert step.step_num == 1
assert step.description == "Do something"
assert step.result == "Done"
assert step.status == "completed"
assert step.duration_ms == 42
class TestAgenticResult:
def test_defaults(self):
r = AgenticResult(task_id="abc", task="test task", summary="ok")
assert r.steps == []
assert r.status == "completed"
assert r.total_duration_ms == 0
def test_with_steps(self):
step = AgenticStep(1, "s", "r", "completed", 10)
r = AgenticResult(task_id="x", task="t", summary="s", steps=[step])
assert len(r.steps) == 1
# ── _parse_steps ─────────────────────────────────────────────────────────────
class TestParseSteps:
def test_numbered_dot(self):
text = "1. First step\n2. Second step\n3. Third step"
assert _parse_steps(text) == ["First step", "Second step", "Third step"]
def test_numbered_paren(self):
text = "1) Alpha\n2) Beta"
assert _parse_steps(text) == ["Alpha", "Beta"]
def test_mixed_whitespace(self):
text = " 1. Indented step\n 2. Another "
result = _parse_steps(text)
assert result == ["Indented step", "Another"]
def test_fallback_plain_lines(self):
text = "Do this\nDo that\nDo the other"
assert _parse_steps(text) == ["Do this", "Do that", "Do the other"]
def test_empty_string(self):
assert _parse_steps("") == []
def test_blank_lines_skipped_in_fallback(self):
text = "line one\n\nline two\n \nline three"
assert _parse_steps(text) == ["line one", "line two", "line three"]
# ── _get_loop_agent ──────────────────────────────────────────────────────────
class TestGetLoopAgent:
def test_creates_agent_once(self):
import timmy.agentic_loop as al
saved = al._loop_agent
try:
al._loop_agent = None
mock_agent = MagicMock()
with patch("timmy.agent.create_timmy", return_value=mock_agent):
result = al._get_loop_agent()
assert result is mock_agent
# Second call returns cached
result2 = al._get_loop_agent()
assert result2 is mock_agent
finally:
al._loop_agent = saved
def test_returns_cached(self):
import timmy.agentic_loop as al
saved = al._loop_agent
try:
sentinel = object()
al._loop_agent = sentinel
assert al._get_loop_agent() is sentinel
finally:
al._loop_agent = saved
# ── _broadcast_progress ──────────────────────────────────────────────────────
class TestBroadcastProgress:
@pytest.mark.asyncio
async def test_success(self):
mock_ws = AsyncMock()
with (
patch("timmy.agentic_loop.ws_manager", mock_ws, create=True),
patch.dict(
"sys.modules",
{"infrastructure.ws_manager.handler": MagicMock(ws_manager=mock_ws)},
),
):
await _broadcast_progress("test.event", {"key": "val"})
mock_ws.broadcast.assert_awaited_once_with("test.event", {"key": "val"})
@pytest.mark.asyncio
async def test_import_error_swallowed(self):
with patch.dict("sys.modules", {"infrastructure.ws_manager.handler": None}):
# Should not raise
await _broadcast_progress("test.event", {})
# ── run_agentic_loop ─────────────────────────────────────────────────────────
def _make_mock_agent(plan_text, step_responses=None):
"""Create a mock agent whose .run returns predictable content."""
call_count = 0
def run_side_effect(prompt, *, stream=False, session_id=""):
nonlocal call_count
call_count += 1
resp = MagicMock()
if call_count == 1:
# Planning call
resp.content = plan_text
else:
idx = call_count - 2 # step index (0-based)
if step_responses and idx < len(step_responses):
val = step_responses[idx]
if isinstance(val, Exception):
raise val
resp.content = val
else:
resp.content = f"Step result {call_count}"
return resp
agent = MagicMock()
agent.run = MagicMock(side_effect=run_side_effect)
return agent
@pytest.fixture
def _patch_broadcast():
with patch("timmy.agentic_loop._broadcast_progress", new_callable=AsyncMock):
yield
@pytest.fixture
def _patch_clean_response():
with patch("timmy.session._clean_response", side_effect=lambda x: x):
yield
class TestRunAgenticLoop:
@pytest.mark.asyncio
async def test_successful_execution(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. Step A\n2. Step B", ["Result A", "Result B"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch("timmy.agentic_loop.settings", mock_settings, create=True),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
assert result.status == "completed"
assert len(result.steps) == 2
assert result.steps[0].status == "completed"
assert result.steps[0].description == "Step A"
assert result.total_duration_ms >= 0
@pytest.mark.asyncio
async def test_planning_failure(self, _patch_broadcast):
agent = MagicMock()
agent.run = MagicMock(side_effect=RuntimeError("LLM down"))
mock_settings = MagicMock()
mock_settings.max_agent_steps = 5
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=3)
assert result.status == "failed"
assert "Planning failed" in result.summary
@pytest.mark.asyncio
async def test_empty_plan(self, _patch_broadcast):
agent = _make_mock_agent("")
mock_settings = MagicMock()
mock_settings.max_agent_steps = 5
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=3)
assert result.status == "failed"
assert "no steps" in result.summary.lower()
@pytest.mark.asyncio
async def test_step_failure_triggers_adaptation(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent(
"1. Do X\n2. Do Y",
[RuntimeError("oops"), "Adapted result", "Y done"],
)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
# Step 1 should be adapted, step 2 completed
statuses = [s.status for s in result.steps]
assert "adapted" in statuses
@pytest.mark.asyncio
async def test_truncation_marks_partial(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent(
"1. A\n2. B\n3. C\n4. D\n5. E",
["r1", "r2"],
)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=2)
assert result.status == "partial"
@pytest.mark.asyncio
async def test_on_progress_callback(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. Only step", ["done"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
callback = AsyncMock()
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5, on_progress=callback)
callback.assert_awaited_once_with("Only step", 1, 1)
assert result.status == "completed"
@pytest.mark.asyncio
async def test_default_max_steps_from_settings(self, _patch_broadcast, _patch_clean_response):
agent = _make_mock_agent("1. S1", ["r1"])
mock_settings = MagicMock()
mock_settings.max_agent_steps = 3
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff") # max_steps=0 → from settings
assert result.status == "completed"
@pytest.mark.asyncio
async def test_failed_step_and_failed_adaptation(self, _patch_broadcast, _patch_clean_response):
"""When both step and adaptation fail, step is marked failed."""
call_count = 0
def run_side_effect(prompt, *, stream=False, session_id=""):
nonlocal call_count
call_count += 1
if call_count == 1:
resp = MagicMock()
resp.content = "1. Only step"
return resp
# Both step execution and adaptation fail
raise RuntimeError("everything broken")
agent = MagicMock()
agent.run = MagicMock(side_effect=run_side_effect)
mock_settings = MagicMock()
mock_settings.max_agent_steps = 10
with (
patch("timmy.agentic_loop._get_loop_agent", return_value=agent),
patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}),
):
result = await run_agentic_loop("do stuff", max_steps=5)
assert result.steps[0].status == "failed"
assert "Failed" in result.steps[0].result
assert result.status == "partial"

View File

@@ -0,0 +1,162 @@
"""Unit tests for timmy.memory.embeddings — embedding, similarity, and keyword overlap."""
import math
from unittest.mock import MagicMock, patch
import pytest
import timmy.memory.embeddings as emb
from timmy.memory.embeddings import (
_keyword_overlap,
_simple_hash_embedding,
cosine_similarity,
embed_text,
)
# ── _simple_hash_embedding ──────────────────────────────────────────────────
class TestSimpleHashEmbedding:
def test_returns_128_dim_vector(self):
vec = _simple_hash_embedding("hello world")
assert len(vec) == 128
def test_normalized(self):
vec = _simple_hash_embedding("some text for embedding")
mag = math.sqrt(sum(x * x for x in vec))
assert mag == pytest.approx(1.0, abs=1e-6)
def test_deterministic(self):
a = _simple_hash_embedding("same input")
b = _simple_hash_embedding("same input")
assert a == b
def test_different_texts_differ(self):
a = _simple_hash_embedding("hello world")
b = _simple_hash_embedding("goodbye moon")
assert a != b
def test_empty_string(self):
vec = _simple_hash_embedding("")
assert len(vec) == 128
# All zeros normalised stays zero (mag fallback to 1.0)
assert all(x == 0.0 for x in vec)
def test_long_text_truncates_at_50_words(self):
"""Words beyond 50 should not change the result."""
short = " ".join(f"word{i}" for i in range(50))
long = short + " extra1 extra2 extra3"
assert _simple_hash_embedding(short) == _simple_hash_embedding(long)
# ── cosine_similarity ────────────────────────────────────────────────────────
class TestCosineSimilarity:
def test_identical_vectors(self):
v = [1.0, 2.0, 3.0]
assert cosine_similarity(v, v) == pytest.approx(1.0)
def test_orthogonal_vectors(self):
a = [1.0, 0.0]
b = [0.0, 1.0]
assert cosine_similarity(a, b) == pytest.approx(0.0)
def test_opposite_vectors(self):
a = [1.0, 0.0]
b = [-1.0, 0.0]
assert cosine_similarity(a, b) == pytest.approx(-1.0)
def test_zero_vector_returns_zero(self):
assert cosine_similarity([0.0, 0.0], [1.0, 2.0]) == 0.0
assert cosine_similarity([1.0, 2.0], [0.0, 0.0]) == 0.0
def test_both_zero_vectors(self):
assert cosine_similarity([0.0], [0.0]) == 0.0
# ── _keyword_overlap ─────────────────────────────────────────────────────────
class TestKeywordOverlap:
def test_full_overlap(self):
assert _keyword_overlap("hello world", "hello world") == pytest.approx(1.0)
def test_partial_overlap(self):
assert _keyword_overlap("hello world", "hello moon") == pytest.approx(0.5)
def test_no_overlap(self):
assert _keyword_overlap("hello", "goodbye") == pytest.approx(0.0)
def test_empty_query(self):
assert _keyword_overlap("", "anything") == 0.0
def test_case_insensitive(self):
assert _keyword_overlap("Hello World", "hello world") == pytest.approx(1.0)
# ── embed_text ───────────────────────────────────────────────────────────────
class TestEmbedText:
def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_uses_fallback_when_model_disabled(self):
emb.EMBEDDING_MODEL = False
vec = embed_text("test")
assert len(vec) == 128 # hash fallback dimension
def test_uses_model_when_available(self):
mock_encoding = MagicMock()
mock_encoding.tolist.return_value = [0.1, 0.2, 0.3]
mock_model = MagicMock()
mock_model.encode.return_value = mock_encoding
emb.EMBEDDING_MODEL = mock_model
result = embed_text("test")
assert result == pytest.approx([0.1, 0.2, 0.3])
mock_model.encode.assert_called_once_with("test")
# ── _get_embedding_model ─────────────────────────────────────────────────────
class TestGetEmbeddingModel:
def setup_method(self):
self._saved_model = emb.EMBEDDING_MODEL
emb.EMBEDDING_MODEL = None
def teardown_method(self):
emb.EMBEDDING_MODEL = self._saved_model
def test_skip_embeddings_setting(self):
mock_settings = MagicMock()
mock_settings.timmy_skip_embeddings = True
with patch.dict("sys.modules", {"config": MagicMock(settings=mock_settings)}):
emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model()
assert result is False
def test_fallback_when_transformers_missing(self):
mock_settings = MagicMock()
mock_settings.timmy_skip_embeddings = False
with patch.dict(
"sys.modules",
{
"config": MagicMock(settings=mock_settings),
"sentence_transformers": None,
},
):
emb.EMBEDDING_MODEL = None
result = emb._get_embedding_model()
assert result is False
def test_returns_cached_model(self):
sentinel = object()
emb.EMBEDDING_MODEL = sentinel
assert emb._get_embedding_model() is sentinel