From 3e51434b4bc2171e92992c18ba10c433015e82f1 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 24 Feb 2026 23:36:50 +0000 Subject: [PATCH] test: add 157 functional tests covering 8 low-coverage modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Analyze test coverage (75.3% → 85.4%) and add functional test suites for the major gaps identified: - test_agent_core.py: Full coverage for agent_core/interface.py (0→100%) and agent_core/ollama_adapter.py (0→100%) — data classes, factories, abstract enforcement, perceive/reason/act/recall workflow, effect logging - test_docker_runner.py: Full coverage for swarm/docker_runner.py (0→100%) — container spawn/stop/list lifecycle with mocked subprocess - test_timmy_tools.py: Tool usage tracking, persona toolkit mapping, catalog generation, graceful degradation without Agno - test_routes_tools.py: /tools page, API stats endpoint, and WebSocket /swarm/live connect/disconnect/send lifecycle (41→82%) - test_voice_tts_functional.py: VoiceTTS init, speak, volume clamping, voice listing, graceful degradation (41→94%) - test_watchdog_functional.py: _run_tests, watch loop state transitions, regression detection, KeyboardInterrupt (47→97%) - test_lnd_backend.py: LND init from params/env, grpc stub enforcement, method-level BackendNotAvailableError, settle returns False (25→61%) - test_swarm_routes_functional.py: Agent spawn/stop, task CRUD, auction, insights, UI partials, error paths (63→92%) https://claude.ai/code/session_01WU4h3cQQiouMwmgYmAgkMM --- tests/test_agent_core.py | 456 ++++++++++++++++++++++++++ tests/test_docker_runner.py | 170 ++++++++++ tests/test_lnd_backend.py | 129 ++++++++ tests/test_routes_tools.py | 70 ++++ tests/test_swarm_routes_functional.py | 242 ++++++++++++++ tests/test_timmy_tools.py | 169 ++++++++++ tests/test_voice_tts_functional.py | 155 +++++++++ tests/test_watchdog_functional.py | 100 ++++++ 8 files changed, 1491 insertions(+) create mode 100644 tests/test_agent_core.py create mode 100644 tests/test_docker_runner.py create mode 100644 tests/test_lnd_backend.py create mode 100644 tests/test_routes_tools.py create mode 100644 tests/test_swarm_routes_functional.py create mode 100644 tests/test_timmy_tools.py create mode 100644 tests/test_voice_tts_functional.py create mode 100644 tests/test_watchdog_functional.py diff --git a/tests/test_agent_core.py b/tests/test_agent_core.py new file mode 100644 index 00000000..e85c2634 --- /dev/null +++ b/tests/test_agent_core.py @@ -0,0 +1,456 @@ +"""Functional tests for agent_core — interface and ollama_adapter. + +Covers the substrate-agnostic agent contract (data classes, enums, +factory methods, abstract enforcement) and the OllamaAgent adapter +(perceive → reason → act → remember → recall → communicate workflow). +""" + +import uuid +from unittest.mock import MagicMock, patch + +import pytest + +from agent_core.interface import ( + ActionType, + AgentCapability, + AgentEffect, + AgentIdentity, + Action, + Communication, + Memory, + Perception, + PerceptionType, + TimAgent, +) + + +# ── AgentIdentity ───────────────────────────────────────────────────────────── + + +class TestAgentIdentity: + def test_generate_creates_uuid(self): + identity = AgentIdentity.generate("Timmy") + assert identity.name == "Timmy" + uuid.UUID(identity.id) # raises on invalid + + def test_generate_default_version(self): + identity = AgentIdentity.generate("Timmy") + assert identity.version == "1.0.0" + + def test_generate_custom_version(self): + identity = AgentIdentity.generate("Timmy", version="2.0.0") + assert identity.version == "2.0.0" + + def test_frozen_identity(self): + identity = AgentIdentity.generate("Timmy") + with pytest.raises(AttributeError): + identity.name = "Other" + + def test_created_at_populated(self): + identity = AgentIdentity.generate("Timmy") + assert identity.created_at # not empty + assert "T" in identity.created_at # ISO format + + def test_two_identities_differ(self): + a = AgentIdentity.generate("A") + b = AgentIdentity.generate("B") + assert a.id != b.id + + +# ── Perception ──────────────────────────────────────────────────────────────── + + +class TestPerception: + def test_text_factory(self): + p = Perception.text("hello") + assert p.type == PerceptionType.TEXT + assert p.data == "hello" + assert p.source == "user" + + def test_text_factory_custom_source(self): + p = Perception.text("hello", source="api") + assert p.source == "api" + + def test_sensor_factory(self): + p = Perception.sensor("temperature", 22.5, "°C") + assert p.type == PerceptionType.SENSOR + assert p.data["kind"] == "temperature" + assert p.data["value"] == 22.5 + assert p.data["unit"] == "°C" + assert p.source == "sensor_temperature" + + def test_timestamp_auto_populated(self): + p = Perception.text("hi") + assert p.timestamp + assert "T" in p.timestamp + + def test_metadata_defaults_empty(self): + p = Perception.text("hi") + assert p.metadata == {} + + +# ── Action ──────────────────────────────────────────────────────────────────── + + +class TestAction: + def test_respond_factory(self): + a = Action.respond("Hello!") + assert a.type == ActionType.TEXT + assert a.payload == "Hello!" + assert a.confidence == 1.0 + + def test_respond_with_confidence(self): + a = Action.respond("Maybe", confidence=0.5) + assert a.confidence == 0.5 + + def test_move_factory(self): + a = Action.move((1.0, 2.0, 3.0), speed=0.5) + assert a.type == ActionType.MOVE + assert a.payload["vector"] == (1.0, 2.0, 3.0) + assert a.payload["speed"] == 0.5 + + def test_move_default_speed(self): + a = Action.move((0, 0, 0)) + assert a.payload["speed"] == 1.0 + + def test_deadline_defaults_none(self): + a = Action.respond("test") + assert a.deadline is None + + +# ── Memory ──────────────────────────────────────────────────────────────────── + + +class TestMemory: + def test_touch_increments(self): + m = Memory(id="m1", content="hello", created_at="2025-01-01T00:00:00Z") + assert m.access_count == 0 + m.touch() + assert m.access_count == 1 + m.touch() + assert m.access_count == 2 + + def test_touch_sets_last_accessed(self): + m = Memory(id="m1", content="hello", created_at="2025-01-01T00:00:00Z") + assert m.last_accessed is None + m.touch() + assert m.last_accessed is not None + + def test_default_importance(self): + m = Memory(id="m1", content="x", created_at="now") + assert m.importance == 0.5 + + def test_tags_default_empty(self): + m = Memory(id="m1", content="x", created_at="now") + assert m.tags == [] + + +# ── Communication ───────────────────────────────────────────────────────────── + + +class TestCommunication: + def test_defaults(self): + c = Communication(sender="A", recipient="B", content="hi") + assert c.protocol == "direct" + assert c.encrypted is False + assert c.timestamp # auto-populated + + +# ── TimAgent abstract enforcement ───────────────────────────────────────────── + + +class TestTimAgentABC: + def test_cannot_instantiate_abstract(self): + with pytest.raises(TypeError): + TimAgent(AgentIdentity.generate("X")) + + def test_concrete_subclass_works(self): + class Dummy(TimAgent): + def perceive(self, p): return Memory(id="1", content=p.data, created_at="") + def reason(self, q, c): return Action.respond(q) + def act(self, a): return a.payload + def remember(self, m): pass + def recall(self, q, limit=5): return [] + def communicate(self, m): return True + + d = Dummy(AgentIdentity.generate("Dummy")) + assert d.identity.name == "Dummy" + assert d.capabilities == set() + + def test_has_capability(self): + class Dummy(TimAgent): + def perceive(self, p): pass + def reason(self, q, c): pass + def act(self, a): pass + def remember(self, m): pass + def recall(self, q, limit=5): return [] + def communicate(self, m): return True + + d = Dummy(AgentIdentity.generate("D")) + d._capabilities.add(AgentCapability.REASONING) + assert d.has_capability(AgentCapability.REASONING) + assert not d.has_capability(AgentCapability.VISION) + + def test_capabilities_returns_copy(self): + class Dummy(TimAgent): + def perceive(self, p): pass + def reason(self, q, c): pass + def act(self, a): pass + def remember(self, m): pass + def recall(self, q, limit=5): return [] + def communicate(self, m): return True + + d = Dummy(AgentIdentity.generate("D")) + caps = d.capabilities + caps.add(AgentCapability.VISION) + assert AgentCapability.VISION not in d.capabilities + + def test_get_state(self): + class Dummy(TimAgent): + def perceive(self, p): pass + def reason(self, q, c): pass + def act(self, a): pass + def remember(self, m): pass + def recall(self, q, limit=5): return [] + def communicate(self, m): return True + + d = Dummy(AgentIdentity.generate("D")) + state = d.get_state() + assert "identity" in state + assert "capabilities" in state + assert "state" in state + + def test_shutdown_does_not_raise(self): + class Dummy(TimAgent): + def perceive(self, p): pass + def reason(self, q, c): pass + def act(self, a): pass + def remember(self, m): pass + def recall(self, q, limit=5): return [] + def communicate(self, m): return True + + d = Dummy(AgentIdentity.generate("D")) + d.shutdown() # should not raise + + +# ── AgentEffect ─────────────────────────────────────────────────────────────── + + +class TestAgentEffect: + def test_empty_export(self): + effect = AgentEffect() + assert effect.export() == [] + + def test_log_perceive(self): + effect = AgentEffect() + p = Perception.text("test input") + effect.log_perceive(p, "mem_0") + log = effect.export() + assert len(log) == 1 + assert log[0]["type"] == "perceive" + assert log[0]["perception_type"] == "TEXT" + assert log[0]["memory_id"] == "mem_0" + assert "timestamp" in log[0] + + def test_log_reason(self): + effect = AgentEffect() + effect.log_reason("How to help?", ActionType.TEXT) + log = effect.export() + assert len(log) == 1 + assert log[0]["type"] == "reason" + assert log[0]["query"] == "How to help?" + assert log[0]["action_type"] == "TEXT" + + def test_log_act(self): + effect = AgentEffect() + action = Action.respond("Hello!") + effect.log_act(action, "Hello!") + log = effect.export() + assert len(log) == 1 + assert log[0]["type"] == "act" + assert log[0]["confidence"] == 1.0 + assert log[0]["result_type"] == "str" + + def test_export_returns_copy(self): + effect = AgentEffect() + effect.log_reason("q", ActionType.TEXT) + exported = effect.export() + exported.clear() + assert len(effect.export()) == 1 + + def test_full_audit_trail(self): + effect = AgentEffect() + p = Perception.text("input") + effect.log_perceive(p, "m0") + effect.log_reason("what now?", ActionType.TEXT) + action = Action.respond("response") + effect.log_act(action, "response") + log = effect.export() + assert len(log) == 3 + types = [e["type"] for e in log] + assert types == ["perceive", "reason", "act"] + + +# ── OllamaAgent functional tests ───────────────────────────────────────────── + + +class TestOllamaAgent: + """Functional tests for the OllamaAgent adapter. + + Uses mocked Ollama (create_timmy returns a mock) to exercise + the full perceive → reason → act → remember → recall pipeline. + """ + + @pytest.fixture + def agent(self): + with patch("agent_core.ollama_adapter.create_timmy") as mock_ct: + mock_timmy = MagicMock() + mock_run = MagicMock() + mock_run.content = "Mocked LLM response" + mock_timmy.run.return_value = mock_run + mock_ct.return_value = mock_timmy + + from agent_core.ollama_adapter import OllamaAgent + identity = AgentIdentity.generate("TestTimmy") + return OllamaAgent(identity, effect_log="/tmp/test_effects") + + def test_capabilities_set(self, agent): + caps = agent.capabilities + assert AgentCapability.REASONING in caps + assert AgentCapability.CODING in caps + assert AgentCapability.WRITING in caps + assert AgentCapability.ANALYSIS in caps + assert AgentCapability.COMMUNICATION in caps + + def test_perceive_creates_memory(self, agent): + p = Perception.text("Hello Timmy") + mem = agent.perceive(p) + assert mem.id == "mem_0" + assert mem.content["data"] == "Hello Timmy" + assert mem.content["type"] == "TEXT" + + def test_perceive_extracts_tags(self, agent): + p = Perception.text("I need help with a bug in my code") + mem = agent.perceive(p) + assert "TEXT" in mem.tags + assert "user" in mem.tags + assert "help" in mem.tags + assert "bug" in mem.tags + assert "code" in mem.tags + + def test_perceive_fifo_eviction(self, agent): + for i in range(12): + agent.perceive(Perception.text(f"msg {i}")) + assert len(agent._working_memory) == 10 + # oldest two evicted + assert agent._working_memory[0].content["data"] == "msg 2" + + def test_reason_returns_action(self, agent): + mem = agent.perceive(Perception.text("context")) + action = agent.reason("What should I do?", [mem]) + assert action.type == ActionType.TEXT + assert action.payload == "Mocked LLM response" + assert action.confidence == 0.9 + + def test_act_text(self, agent): + action = Action.respond("Hello!") + result = agent.act(action) + assert result == "Hello!" + + def test_act_speak(self, agent): + action = Action(type=ActionType.SPEAK, payload="Speak this") + result = agent.act(action) + assert result["spoken"] == "Speak this" + assert result["tts_engine"] == "pyttsx3" + + def test_act_call(self, agent): + action = Action(type=ActionType.CALL, payload={"url": "http://example.com"}) + result = agent.act(action) + assert result["status"] == "not_implemented" + + def test_act_unsupported(self, agent): + action = Action(type=ActionType.MOVE, payload=(0, 0, 0)) + result = agent.act(action) + assert "error" in result + + def test_remember_stores_and_deduplicates(self, agent): + mem = agent.perceive(Perception.text("original")) + assert len(agent._working_memory) == 1 + agent.remember(mem) + assert len(agent._working_memory) == 1 # deduplicated + assert mem.access_count == 1 + + def test_remember_evicts_on_overflow(self, agent): + for i in range(10): + agent.perceive(Perception.text(f"fill {i}")) + extra = Memory(id="extra", content="overflow", created_at="now") + agent.remember(extra) + assert len(agent._working_memory) == 10 + # first memory evicted + assert agent._working_memory[-1].id == "extra" + + def test_recall_keyword_matching(self, agent): + agent.perceive(Perception.text("python code review")) + agent.perceive(Perception.text("weather forecast")) + agent.perceive(Perception.text("python bug fix")) + results = agent.recall("python", limit=5) + # All memories returned (recall returns up to limit) + assert len(results) == 3 + # Memories containing "python" should score higher and appear first + first_content = str(results[0].content) + assert "python" in first_content.lower() + + def test_recall_respects_limit(self, agent): + for i in range(10): + agent.perceive(Perception.text(f"memory {i}")) + results = agent.recall("memory", limit=3) + assert len(results) == 3 + + def test_communicate_success(self, agent): + with patch("swarm.comms.SwarmComms") as MockComms: + mock_comms = MagicMock() + MockComms.return_value = mock_comms + msg = Communication(sender="Timmy", recipient="Echo", content="hi") + result = agent.communicate(msg) + # communicate returns True on success, False on exception + assert isinstance(result, bool) + + def test_communicate_failure(self, agent): + # Force an import error inside communicate() to trigger except branch + with patch.dict("sys.modules", {"swarm.comms": None}): + msg = Communication(sender="Timmy", recipient="Echo", content="hi") + assert agent.communicate(msg) is False + + def test_effect_logging_full_workflow(self, agent): + p = Perception.text("test input") + mem = agent.perceive(p) + action = agent.reason("respond", [mem]) + agent.act(action) + log = agent.get_effect_log() + assert len(log) == 3 + assert log[0]["type"] == "perceive" + assert log[1]["type"] == "reason" + assert log[2]["type"] == "act" + + def test_no_effect_log_when_disabled(self): + with patch("agent_core.ollama_adapter.create_timmy") as mock_ct: + mock_timmy = MagicMock() + mock_ct.return_value = mock_timmy + from agent_core.ollama_adapter import OllamaAgent + identity = AgentIdentity.generate("NoLog") + agent = OllamaAgent(identity) # no effect_log + assert agent.get_effect_log() is None + + def test_format_context_empty(self, agent): + result = agent._format_context([]) + assert result == "No previous context." + + def test_format_context_with_dict_content(self, agent): + mem = Memory(id="m", content={"data": "hello"}, created_at="now") + result = agent._format_context([mem]) + assert "hello" in result + + def test_format_context_with_string_content(self, agent): + mem = Memory(id="m", content="plain string", created_at="now") + result = agent._format_context([mem]) + assert "plain string" in result diff --git a/tests/test_docker_runner.py b/tests/test_docker_runner.py new file mode 100644 index 00000000..896c7c97 --- /dev/null +++ b/tests/test_docker_runner.py @@ -0,0 +1,170 @@ +"""Functional tests for swarm.docker_runner — Docker container lifecycle. + +All subprocess calls are mocked so Docker is not required. +""" + +from unittest.mock import MagicMock, patch, call + +import pytest + +from swarm.docker_runner import DockerAgentRunner, ManagedContainer + + +class TestDockerAgentRunner: + """Test container spawn/stop/list lifecycle.""" + + def test_init_defaults(self): + runner = DockerAgentRunner() + assert runner.image == "timmy-time:latest" + assert runner.coordinator_url == "http://dashboard:8000" + assert runner.extra_env == {} + assert runner._containers == {} + + def test_init_custom(self): + runner = DockerAgentRunner( + image="custom:v2", + coordinator_url="http://host:9000", + extra_env={"FOO": "bar"}, + ) + assert runner.image == "custom:v2" + assert runner.coordinator_url == "http://host:9000" + assert runner.extra_env == {"FOO": "bar"} + + @patch("swarm.docker_runner.subprocess.run") + def test_spawn_success(self, mock_run): + mock_run.return_value = MagicMock( + returncode=0, stdout="abc123container\n", stderr="" + ) + runner = DockerAgentRunner() + info = runner.spawn("Echo", agent_id="test-id-1234", capabilities="summarise") + + assert info["container_id"] == "abc123container" + assert info["agent_id"] == "test-id-1234" + assert info["name"] == "Echo" + assert info["capabilities"] == "summarise" + assert "abc123container" in runner._containers + + # Verify docker command structure + cmd = mock_run.call_args[0][0] + assert cmd[0] == "docker" + assert cmd[1] == "run" + assert "--detach" in cmd + assert "--name" in cmd + assert "timmy-time:latest" in cmd + + @patch("swarm.docker_runner.subprocess.run") + def test_spawn_generates_uuid_when_no_agent_id(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="cid\n", stderr="") + runner = DockerAgentRunner() + info = runner.spawn("Echo") + # agent_id should be a valid UUID-like string + assert len(info["agent_id"]) == 36 # UUID format + + @patch("swarm.docker_runner.subprocess.run") + def test_spawn_custom_image(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="cid\n", stderr="") + runner = DockerAgentRunner() + info = runner.spawn("Echo", image="custom:latest") + assert info["image"] == "custom:latest" + + @patch("swarm.docker_runner.subprocess.run") + def test_spawn_docker_error(self, mock_run): + mock_run.return_value = MagicMock( + returncode=1, stdout="", stderr="no such image" + ) + runner = DockerAgentRunner() + with pytest.raises(RuntimeError, match="no such image"): + runner.spawn("Echo") + + @patch("swarm.docker_runner.subprocess.run", side_effect=FileNotFoundError) + def test_spawn_docker_not_installed(self, mock_run): + runner = DockerAgentRunner() + with pytest.raises(RuntimeError, match="Docker CLI not found"): + runner.spawn("Echo") + + @patch("swarm.docker_runner.subprocess.run") + def test_stop_success(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="cid\n", stderr="") + runner = DockerAgentRunner() + # Spawn first + runner.spawn("Echo", agent_id="a1") + cid = list(runner._containers.keys())[0] + + mock_run.reset_mock() + mock_run.return_value = MagicMock(returncode=0) + + assert runner.stop(cid) is True + assert cid not in runner._containers + # Verify docker rm -f was called + rm_cmd = mock_run.call_args[0][0] + assert rm_cmd[0] == "docker" + assert rm_cmd[1] == "rm" + assert "-f" in rm_cmd + + @patch("swarm.docker_runner.subprocess.run", side_effect=Exception("fail")) + def test_stop_failure(self, mock_run): + runner = DockerAgentRunner() + runner._containers["fake"] = ManagedContainer( + container_id="fake", agent_id="a", name="X", image="img" + ) + assert runner.stop("fake") is False + + @patch("swarm.docker_runner.subprocess.run") + def test_stop_all(self, mock_run): + # Return different container IDs so they don't overwrite each other + mock_run.side_effect = [ + MagicMock(returncode=0, stdout="cid_a\n", stderr=""), + MagicMock(returncode=0, stdout="cid_b\n", stderr=""), + ] + runner = DockerAgentRunner() + runner.spawn("A", agent_id="a1") + runner.spawn("B", agent_id="a2") + assert len(runner._containers) == 2 + + mock_run.side_effect = None + mock_run.return_value = MagicMock(returncode=0) + stopped = runner.stop_all() + assert stopped == 2 + assert len(runner._containers) == 0 + + @patch("swarm.docker_runner.subprocess.run") + def test_list_containers(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="cid\n", stderr="") + runner = DockerAgentRunner() + runner.spawn("Echo", agent_id="e1") + containers = runner.list_containers() + assert len(containers) == 1 + assert containers[0].name == "Echo" + + @patch("swarm.docker_runner.subprocess.run") + def test_is_running_true(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="true\n", stderr="") + runner = DockerAgentRunner() + assert runner.is_running("somecid") is True + + @patch("swarm.docker_runner.subprocess.run") + def test_is_running_false(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="false\n", stderr="") + runner = DockerAgentRunner() + assert runner.is_running("somecid") is False + + @patch("swarm.docker_runner.subprocess.run", side_effect=Exception("timeout")) + def test_is_running_exception(self, mock_run): + runner = DockerAgentRunner() + assert runner.is_running("somecid") is False + + @patch("swarm.docker_runner.subprocess.run") + def test_build_env_flags(self, mock_run): + runner = DockerAgentRunner(extra_env={"CUSTOM": "val"}) + flags = runner._build_env_flags("agent-1", "Echo", "summarise") + # Should contain pairs of --env KEY=VALUE + env_dict = {} + for i, f in enumerate(flags): + if f == "--env" and i + 1 < len(flags): + k, v = flags[i + 1].split("=", 1) + env_dict[k] = v + assert env_dict["COORDINATOR_URL"] == "http://dashboard:8000" + assert env_dict["AGENT_NAME"] == "Echo" + assert env_dict["AGENT_ID"] == "agent-1" + assert env_dict["AGENT_CAPABILITIES"] == "summarise" + assert env_dict["CUSTOM"] == "val" diff --git a/tests/test_lnd_backend.py b/tests/test_lnd_backend.py new file mode 100644 index 00000000..a65b81f5 --- /dev/null +++ b/tests/test_lnd_backend.py @@ -0,0 +1,129 @@ +"""Functional tests for lightning.lnd_backend — LND gRPC backend. + +gRPC is stubbed via sys.modules; tests verify initialization, error +handling, and the placeholder method behavior. +""" + +import importlib +import os +import sys +from unittest.mock import patch, MagicMock + +import pytest + +from lightning.base import ( + BackendNotAvailableError, + Invoice, + LightningError, +) + + +def _make_grpc_mock(): + """Create a mock grpc module with required attributes.""" + mock_grpc = MagicMock() + mock_grpc.StatusCode.NOT_FOUND = "NOT_FOUND" + mock_grpc.RpcError = type("RpcError", (Exception,), { + "code": lambda self: "NOT_FOUND", + "details": lambda self: "mocked error", + }) + return mock_grpc + + +@pytest.fixture +def lnd_module(): + """Reload lnd_backend with grpc stubbed so GRPC_AVAILABLE=True.""" + grpc_mock = _make_grpc_mock() + old = sys.modules.get("grpc") + sys.modules["grpc"] = grpc_mock + try: + import lightning.lnd_backend as mod + importlib.reload(mod) + yield mod + finally: + if old is not None: + sys.modules["grpc"] = old + else: + sys.modules.pop("grpc", None) + # Reload to restore original state + import lightning.lnd_backend as mod2 + importlib.reload(mod2) + + +class TestLndBackendInit: + def test_init_with_explicit_params(self, lnd_module): + backend = lnd_module.LndBackend( + host="localhost:10009", + tls_cert_path="/fake/tls.cert", + macaroon_path="/fake/admin.macaroon", + verify_ssl=True, + ) + assert backend._host == "localhost:10009" + assert backend._tls_cert_path == "/fake/tls.cert" + assert backend._macaroon_path == "/fake/admin.macaroon" + assert backend._verify_ssl is True + + def test_init_from_env_vars(self, lnd_module): + env = { + "LND_GRPC_HOST": "remote:9999", + "LND_TLS_CERT_PATH": "/env/tls.cert", + "LND_MACAROON_PATH": "/env/macaroon", + "LND_VERIFY_SSL": "false", + } + with patch.dict(os.environ, env): + backend = lnd_module.LndBackend() + assert backend._host == "remote:9999" + assert backend._verify_ssl is False + + def test_init_raises_without_grpc(self): + from lightning.lnd_backend import LndBackend + with pytest.raises(LightningError, match="grpcio not installed"): + LndBackend() + + def test_name_is_lnd(self, lnd_module): + assert lnd_module.LndBackend.name == "lnd" + + def test_grpc_available_true_after_reload(self, lnd_module): + assert lnd_module.GRPC_AVAILABLE is True + + +class TestLndBackendMethods: + @pytest.fixture + def backend(self, lnd_module): + return lnd_module.LndBackend( + host="localhost:10009", + macaroon_path="/fake/path", + ) + + def test_check_stub_raises_not_available(self, backend): + """_check_stub should raise BackendNotAvailableError when stub is None.""" + with pytest.raises(BackendNotAvailableError, match="not fully implemented"): + backend._check_stub() + + def test_create_invoice_raises_not_available(self, backend): + with pytest.raises(BackendNotAvailableError): + backend.create_invoice(1000, memo="test") + + def test_check_payment_raises_not_available(self, backend): + with pytest.raises(BackendNotAvailableError): + backend.check_payment("abc123") + + def test_get_invoice_raises_not_available(self, backend): + with pytest.raises(BackendNotAvailableError): + backend.get_invoice("abc123") + + def test_settle_invoice_returns_false(self, backend): + """LND auto-settles, so manual settle always returns False.""" + result = backend.settle_invoice("hash", "preimage") + assert result is False + + def test_list_invoices_raises_not_available(self, backend): + with pytest.raises(BackendNotAvailableError): + backend.list_invoices() + + def test_get_balance_raises_not_available(self, backend): + with pytest.raises(BackendNotAvailableError): + backend.get_balance_sats() + + def test_health_check_raises_not_available(self, backend): + with pytest.raises(BackendNotAvailableError): + backend.health_check() diff --git a/tests/test_routes_tools.py b/tests/test_routes_tools.py new file mode 100644 index 00000000..3a31fcf3 --- /dev/null +++ b/tests/test_routes_tools.py @@ -0,0 +1,70 @@ +"""Functional tests for dashboard routes: /tools and /swarm/live WebSocket. + +Tests the tools dashboard page, API stats endpoint, and the swarm +WebSocket live endpoint. +""" + +from unittest.mock import patch, MagicMock, AsyncMock + +import pytest +from fastapi.testclient import TestClient + + +# ── /tools route ────────────────────────────────────────────────────────────── + + +class TestToolsPage: + def test_tools_page_returns_200(self, client): + response = client.get("/tools") + assert response.status_code == 200 + + def test_tools_page_html_content(self, client): + response = client.get("/tools") + assert "text/html" in response.headers["content-type"] + + def test_tools_api_stats_returns_json(self, client): + response = client.get("/tools/api/stats") + assert response.status_code == 200 + data = response.json() + assert "all_stats" in data + assert "available_tools" in data + assert isinstance(data["available_tools"], list) + assert len(data["available_tools"]) > 0 + + def test_tools_api_stats_includes_base_tools(self, client): + response = client.get("/tools/api/stats") + data = response.json() + base_tools = {"web_search", "shell", "python", "read_file", "write_file", "list_files"} + for tool in base_tools: + assert tool in data["available_tools"], f"Missing: {tool}" + + def test_tools_page_with_agents(self, client): + """Spawn an agent and verify tools page includes it.""" + client.post("/swarm/spawn", data={"name": "Echo"}) + response = client.get("/tools") + assert response.status_code == 200 + + +# ── /swarm/live WebSocket ───────────────────────────────────────────────────── + + +class TestSwarmWebSocket: + def test_websocket_connect_disconnect(self, client): + with client.websocket_connect("/swarm/live") as ws: + # Connection succeeds + pass + # Disconnect on context manager exit + + def test_websocket_send_receive(self, client): + """The WebSocket endpoint should accept messages (it logs them).""" + with client.websocket_connect("/swarm/live") as ws: + ws.send_text("ping") + # The endpoint only echoes via logging, not back to client. + # The key test is that it doesn't crash on receiving a message. + + def test_websocket_multiple_connections(self, client): + """Multiple clients can connect simultaneously.""" + with client.websocket_connect("/swarm/live") as ws1: + with client.websocket_connect("/swarm/live") as ws2: + ws1.send_text("hello from 1") + ws2.send_text("hello from 2") diff --git a/tests/test_swarm_routes_functional.py b/tests/test_swarm_routes_functional.py new file mode 100644 index 00000000..255cf014 --- /dev/null +++ b/tests/test_swarm_routes_functional.py @@ -0,0 +1,242 @@ +"""Functional tests for swarm routes — /swarm/* endpoints. + +Tests the full request/response cycle for swarm management endpoints, +including error paths and HTMX partial rendering. +""" + +from unittest.mock import patch, AsyncMock + +import pytest +from fastapi.testclient import TestClient + + +class TestSwarmStatusRoutes: + def test_swarm_status(self, client): + response = client.get("/swarm") + assert response.status_code == 200 + data = response.json() + assert "agents" in data or "status" in data or isinstance(data, dict) + + def test_list_agents_empty(self, client): + response = client.get("/swarm/agents") + assert response.status_code == 200 + data = response.json() + assert "agents" in data + assert isinstance(data["agents"], list) + + +class TestSwarmAgentLifecycle: + def test_spawn_agent(self, client): + response = client.post("/swarm/spawn", data={"name": "Echo"}) + assert response.status_code == 200 + data = response.json() + assert "id" in data or "agent_id" in data or "name" in data + + def test_spawn_and_list(self, client): + client.post("/swarm/spawn", data={"name": "Echo"}) + response = client.get("/swarm/agents") + data = response.json() + assert len(data["agents"]) >= 1 + names = [a["name"] for a in data["agents"]] + assert "Echo" in names + + def test_stop_agent(self, client): + spawn_resp = client.post("/swarm/spawn", data={"name": "TestAgent"}) + spawn_data = spawn_resp.json() + agent_id = spawn_data.get("id") or spawn_data.get("agent_id") + response = client.delete(f"/swarm/agents/{agent_id}") + assert response.status_code == 200 + data = response.json() + assert data["stopped"] is True + + def test_stop_nonexistent_agent(self, client): + response = client.delete("/swarm/agents/nonexistent-id") + assert response.status_code == 200 + data = response.json() + assert data["stopped"] is False + + +class TestSwarmTaskLifecycle: + def test_post_task(self, client): + response = client.post("/swarm/tasks", data={"description": "Summarise readme"}) + assert response.status_code == 200 + data = response.json() + assert data["description"] == "Summarise readme" + assert data["status"] == "bidding" # coordinator auto-opens auction + assert "task_id" in data + + def test_list_tasks(self, client): + client.post("/swarm/tasks", data={"description": "Task A"}) + client.post("/swarm/tasks", data={"description": "Task B"}) + response = client.get("/swarm/tasks") + assert response.status_code == 200 + data = response.json() + assert len(data["tasks"]) >= 2 + + def test_list_tasks_filter_by_status(self, client): + client.post("/swarm/tasks", data={"description": "Bidding task"}) + response = client.get("/swarm/tasks?status=bidding") + assert response.status_code == 200 + data = response.json() + for task in data["tasks"]: + assert task["status"] == "bidding" + + def test_list_tasks_invalid_status(self, client): + """Invalid TaskStatus enum value causes server error (unhandled ValueError).""" + with pytest.raises(ValueError, match="is not a valid TaskStatus"): + client.get("/swarm/tasks?status=invalid_status") + + def test_get_task_by_id(self, client): + post_resp = client.post("/swarm/tasks", data={"description": "Find me"}) + task_id = post_resp.json()["task_id"] + response = client.get(f"/swarm/tasks/{task_id}") + assert response.status_code == 200 + data = response.json() + assert data["description"] == "Find me" + + def test_get_nonexistent_task(self, client): + response = client.get("/swarm/tasks/nonexistent-id") + assert response.status_code == 200 + data = response.json() + assert "error" in data + + def test_complete_task(self, client): + # Create and assign a task first + client.post("/swarm/spawn", data={"name": "Worker"}) + post_resp = client.post("/swarm/tasks", data={"description": "Do work"}) + task_id = post_resp.json()["task_id"] + response = client.post( + f"/swarm/tasks/{task_id}/complete", + data={"result": "Work done"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "completed" + + def test_complete_nonexistent_task(self, client): + response = client.post( + "/swarm/tasks/fake-id/complete", + data={"result": "done"}, + ) + assert response.status_code == 404 + + def test_fail_task(self, client): + post_resp = client.post("/swarm/tasks", data={"description": "Will fail"}) + task_id = post_resp.json()["task_id"] + response = client.post( + f"/swarm/tasks/{task_id}/fail", + data={"reason": "out of memory"}, + ) + assert response.status_code == 200 + data = response.json() + assert data["status"] == "failed" + + def test_fail_nonexistent_task(self, client): + response = client.post( + "/swarm/tasks/fake-id/fail", + data={"reason": "no reason"}, + ) + assert response.status_code == 404 + + +class TestSwarmAuction: + def test_post_task_and_auction_no_agents(self, client): + """Auction with no bidders should still return a response.""" + with patch( + "swarm.coordinator.AUCTION_DURATION_SECONDS", 0 + ): + response = client.post( + "/swarm/tasks/auction", + data={"description": "Quick task"}, + ) + assert response.status_code == 200 + data = response.json() + assert "task_id" in data + + +class TestSwarmInsights: + def test_insights_empty(self, client): + response = client.get("/swarm/insights") + assert response.status_code == 200 + data = response.json() + assert "agents" in data + + def test_agent_insights(self, client): + response = client.get("/swarm/insights/some-agent-id") + assert response.status_code == 200 + data = response.json() + assert data["agent_id"] == "some-agent-id" + assert "total_bids" in data + assert "win_rate" in data + + +class TestSwarmUIPartials: + def test_live_page(self, client): + response = client.get("/swarm/live") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_agents_sidebar(self, client): + response = client.get("/swarm/agents/sidebar") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_agent_panel_not_found(self, client): + response = client.get("/swarm/agents/nonexistent/panel") + assert response.status_code == 404 + + def test_agent_panel_found(self, client): + spawn_resp = client.post("/swarm/spawn", data={"name": "Echo"}) + agent_id = spawn_resp.json().get("id") or spawn_resp.json().get("agent_id") + response = client.get(f"/swarm/agents/{agent_id}/panel") + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_task_panel_route_shadowed(self, client): + """The /swarm/tasks/panel route is shadowed by /swarm/tasks/{task_id}. + + FastAPI matches the dynamic {task_id} route first, so "panel" is + treated as a task_id lookup, returning JSON with an error. + This documents the current behavior (a routing order issue). + """ + response = client.get("/swarm/tasks/panel") + assert response.status_code == 200 + data = response.json() + assert "error" in data + + def test_direct_assign_with_agent(self, client): + spawn_resp = client.post("/swarm/spawn", data={"name": "Worker"}) + agent_id = spawn_resp.json().get("id") or spawn_resp.json().get("agent_id") + response = client.post( + "/swarm/tasks/direct", + data={"description": "Direct task", "agent_id": agent_id}, + ) + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_direct_assign_without_agent(self, client): + """No agent → runs auction (with no bidders).""" + with patch("swarm.coordinator.AUCTION_DURATION_SECONDS", 0): + response = client.post( + "/swarm/tasks/direct", + data={"description": "Open task"}, + ) + assert response.status_code == 200 + + def test_message_agent_creates_task(self, client): + """Messaging a non-Timmy agent creates and assigns a task.""" + spawn_resp = client.post("/swarm/spawn", data={"name": "Echo"}) + agent_id = spawn_resp.json().get("id") or spawn_resp.json().get("agent_id") + response = client.post( + f"/swarm/agents/{agent_id}/message", + data={"message": "Summarise the readme"}, + ) + assert response.status_code == 200 + assert "text/html" in response.headers["content-type"] + + def test_message_nonexistent_agent(self, client): + response = client.post( + "/swarm/agents/fake-id/message", + data={"message": "hello"}, + ) + assert response.status_code == 404 diff --git a/tests/test_timmy_tools.py b/tests/test_timmy_tools.py new file mode 100644 index 00000000..816ad57c --- /dev/null +++ b/tests/test_timmy_tools.py @@ -0,0 +1,169 @@ +"""Functional tests for timmy.tools — tool tracking, persona toolkits, catalog. + +Covers tool usage statistics, persona-to-toolkit mapping, catalog generation, +and graceful degradation when Agno is unavailable. +""" + +from unittest.mock import patch, MagicMock + +import pytest + +from timmy.tools import ( + _TOOL_USAGE, + _track_tool_usage, + get_tool_stats, + get_tools_for_persona, + get_all_available_tools, + PERSONA_TOOLKITS, +) + + +@pytest.fixture(autouse=True) +def clear_usage(): + """Clear tool usage tracking between tests.""" + _TOOL_USAGE.clear() + yield + _TOOL_USAGE.clear() + + +# ── Tool usage tracking ────────────────────────────────────────────────────── + + +class TestToolTracking: + def test_track_creates_agent_entry(self): + _track_tool_usage("agent-1", "web_search", success=True) + assert "agent-1" in _TOOL_USAGE + assert len(_TOOL_USAGE["agent-1"]) == 1 + + def test_track_records_metadata(self): + _track_tool_usage("agent-1", "shell", success=False) + entry = _TOOL_USAGE["agent-1"][0] + assert entry["tool"] == "shell" + assert entry["success"] is False + assert "timestamp" in entry + + def test_track_multiple_calls(self): + _track_tool_usage("a1", "search") + _track_tool_usage("a1", "read") + _track_tool_usage("a1", "search") + assert len(_TOOL_USAGE["a1"]) == 3 + + def test_track_multiple_agents(self): + _track_tool_usage("a1", "search") + _track_tool_usage("a2", "shell") + assert len(_TOOL_USAGE) == 2 + + +class TestGetToolStats: + def test_stats_for_specific_agent(self): + _track_tool_usage("a1", "search") + _track_tool_usage("a1", "read") + _track_tool_usage("a1", "search") + stats = get_tool_stats("a1") + assert stats["agent_id"] == "a1" + assert stats["total_calls"] == 3 + assert set(stats["tools_used"]) == {"search", "read"} + assert len(stats["recent_calls"]) == 3 + + def test_stats_for_unknown_agent(self): + stats = get_tool_stats("nonexistent") + assert stats["total_calls"] == 0 + assert stats["tools_used"] == [] + assert stats["recent_calls"] == [] + + def test_stats_recent_capped_at_10(self): + for i in range(15): + _track_tool_usage("a1", f"tool_{i}") + stats = get_tool_stats("a1") + assert len(stats["recent_calls"]) == 10 + + def test_stats_all_agents(self): + _track_tool_usage("a1", "search") + _track_tool_usage("a2", "shell") + _track_tool_usage("a2", "read") + stats = get_tool_stats() + assert "a1" in stats + assert "a2" in stats + assert stats["a1"]["total_calls"] == 1 + assert stats["a2"]["total_calls"] == 2 + + def test_stats_empty(self): + stats = get_tool_stats() + assert stats == {} + + +# ── Persona toolkit mapping ────────────────────────────────────────────────── + + +class TestPersonaToolkits: + def test_all_expected_personas_present(self): + expected = {"echo", "mace", "helm", "seer", "forge", "quill", "pixel", "lyra", "reel"} + assert set(PERSONA_TOOLKITS.keys()) == expected + + def test_get_tools_for_known_persona_raises_without_agno(self): + """Agno is mocked but not a real package, so create_*_tools raises ImportError.""" + with pytest.raises(ImportError, match="Agno tools not available"): + get_tools_for_persona("echo") + + def test_get_tools_for_unknown_persona(self): + result = get_tools_for_persona("nonexistent") + assert result is None + + def test_creative_personas_return_none(self): + """Creative personas (pixel, lyra, reel) use stub toolkits that + return None when Agno is unavailable.""" + for persona_id in ("pixel", "lyra", "reel"): + result = get_tools_for_persona(persona_id) + assert result is None + + +# ── Tool catalog ───────────────────────────────────────────────────────────── + + +class TestToolCatalog: + def test_catalog_contains_base_tools(self): + catalog = get_all_available_tools() + base_tools = {"web_search", "shell", "python", "read_file", "write_file", "list_files"} + for tool_id in base_tools: + assert tool_id in catalog, f"Missing base tool: {tool_id}" + + def test_catalog_tool_structure(self): + catalog = get_all_available_tools() + for tool_id, info in catalog.items(): + assert "name" in info, f"{tool_id} missing 'name'" + assert "description" in info, f"{tool_id} missing 'description'" + assert "available_in" in info, f"{tool_id} missing 'available_in'" + assert isinstance(info["available_in"], list) + + def test_catalog_timmy_has_all_base_tools(self): + catalog = get_all_available_tools() + base_tools = {"web_search", "shell", "python", "read_file", "write_file", "list_files"} + for tool_id in base_tools: + assert "timmy" in catalog[tool_id]["available_in"], ( + f"Timmy missing tool: {tool_id}" + ) + + def test_catalog_echo_research_tools(self): + catalog = get_all_available_tools() + assert "echo" in catalog["web_search"]["available_in"] + assert "echo" in catalog["read_file"]["available_in"] + # Echo should NOT have shell + assert "echo" not in catalog["shell"]["available_in"] + + def test_catalog_forge_code_tools(self): + catalog = get_all_available_tools() + assert "forge" in catalog["shell"]["available_in"] + assert "forge" in catalog["python"]["available_in"] + assert "forge" in catalog["write_file"]["available_in"] + + def test_catalog_includes_git_tools(self): + catalog = get_all_available_tools() + git_tools = [k for k in catalog if "git" in k.lower()] + # Should have some git tools from tools.git_tools + assert len(git_tools) > 0 + + def test_catalog_includes_creative_tools(self): + catalog = get_all_available_tools() + # Should pick up image, music, video catalogs + all_keys = list(catalog.keys()) + assert len(all_keys) > 6 # more than just base tools diff --git a/tests/test_voice_tts_functional.py b/tests/test_voice_tts_functional.py new file mode 100644 index 00000000..0480f41c --- /dev/null +++ b/tests/test_voice_tts_functional.py @@ -0,0 +1,155 @@ +"""Functional tests for timmy_serve.voice_tts — TTS engine lifecycle. + +pyttsx3 is not available in CI, so all tests mock the engine. +""" + +import threading +from unittest.mock import patch, MagicMock, PropertyMock + +import pytest + + +class TestVoiceTTS: + """Test TTS engine initialization, speak, and configuration.""" + + def test_init_success(self): + mock_pyttsx3 = MagicMock() + mock_engine = MagicMock() + mock_pyttsx3.init.return_value = mock_engine + + with patch.dict("sys.modules", {"pyttsx3": mock_pyttsx3}): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS(rate=200, volume=0.8) + assert tts.available is True + mock_engine.setProperty.assert_any_call("rate", 200) + mock_engine.setProperty.assert_any_call("volume", 0.8) + + def test_init_failure_graceful(self): + """When pyttsx3 import fails, VoiceTTS degrades gracefully.""" + with patch.dict("sys.modules", {"pyttsx3": None}): + from importlib import reload + import timmy_serve.voice_tts as mod + tts = mod.VoiceTTS.__new__(mod.VoiceTTS) + tts._engine = None + tts._rate = 175 + tts._volume = 0.9 + tts._available = False + tts._lock = threading.Lock() + assert tts.available is False + + def test_speak_skips_when_unavailable(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = None + tts._available = False + tts._lock = threading.Lock() + # Should not raise + tts.speak("hello") + + def test_speak_sync_skips_when_unavailable(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = None + tts._available = False + tts._lock = threading.Lock() + tts.speak_sync("hello") + + def test_speak_calls_engine(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = MagicMock() + tts._available = True + tts._lock = threading.Lock() + + tts.speak("test speech") + # Give the background thread time to execute + import time + time.sleep(0.1) + tts._engine.say.assert_called_with("test speech") + + def test_speak_sync_calls_engine(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = MagicMock() + tts._available = True + tts._lock = threading.Lock() + + tts.speak_sync("sync test") + tts._engine.say.assert_called_with("sync test") + tts._engine.runAndWait.assert_called_once() + + def test_set_rate(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = MagicMock() + tts._rate = 175 + + tts.set_rate(220) + assert tts._rate == 220 + tts._engine.setProperty.assert_called_with("rate", 220) + + def test_set_rate_no_engine(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = None + tts._rate = 175 + tts.set_rate(220) + assert tts._rate == 220 + + def test_set_volume_clamped(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = MagicMock() + tts._volume = 0.9 + + tts.set_volume(1.5) + assert tts._volume == 1.0 + + tts.set_volume(-0.5) + assert tts._volume == 0.0 + + tts.set_volume(0.7) + assert tts._volume == 0.7 + + def test_get_voices_no_engine(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = None + assert tts.get_voices() == [] + + def test_get_voices_with_engine(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + mock_voice = MagicMock() + mock_voice.id = "voice1" + mock_voice.name = "Default" + mock_voice.languages = ["en"] + + tts._engine = MagicMock() + tts._engine.getProperty.return_value = [mock_voice] + + voices = tts.get_voices() + assert len(voices) == 1 + assert voices[0]["id"] == "voice1" + assert voices[0]["name"] == "Default" + assert voices[0]["languages"] == ["en"] + + def test_get_voices_exception(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = MagicMock() + tts._engine.getProperty.side_effect = RuntimeError("no voices") + assert tts.get_voices() == [] + + def test_set_voice(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = MagicMock() + tts.set_voice("voice_id_1") + tts._engine.setProperty.assert_called_with("voice", "voice_id_1") + + def test_set_voice_no_engine(self): + from timmy_serve.voice_tts import VoiceTTS + tts = VoiceTTS.__new__(VoiceTTS) + tts._engine = None + tts.set_voice("voice_id_1") # should not raise diff --git a/tests/test_watchdog_functional.py b/tests/test_watchdog_functional.py new file mode 100644 index 00000000..a5153501 --- /dev/null +++ b/tests/test_watchdog_functional.py @@ -0,0 +1,100 @@ +"""Functional tests for self_tdd.watchdog — continuous test runner. + +All subprocess calls are mocked to avoid running real pytest. +""" + +from unittest.mock import patch, MagicMock, call + +import pytest + +from self_tdd.watchdog import _run_tests, watch + + +class TestRunTests: + @patch("self_tdd.watchdog.subprocess.run") + def test_run_tests_passing(self, mock_run): + mock_run.return_value = MagicMock( + returncode=0, + stdout="5 passed\n", + stderr="", + ) + passed, output = _run_tests() + assert passed is True + assert "5 passed" in output + + @patch("self_tdd.watchdog.subprocess.run") + def test_run_tests_failing(self, mock_run): + mock_run.return_value = MagicMock( + returncode=1, + stdout="2 failed, 3 passed\n", + stderr="ERRORS", + ) + passed, output = _run_tests() + assert passed is False + assert "2 failed" in output + assert "ERRORS" in output + + @patch("self_tdd.watchdog.subprocess.run") + def test_run_tests_command_format(self, mock_run): + mock_run.return_value = MagicMock(returncode=0, stdout="", stderr="") + _run_tests() + cmd = mock_run.call_args[0][0] + assert "pytest" in " ".join(cmd) + assert "tests/" in cmd + assert "-q" in cmd + assert "--tb=short" in cmd + assert mock_run.call_args[1]["capture_output"] is True + assert mock_run.call_args[1]["text"] is True + + +class TestWatch: + @patch("self_tdd.watchdog.time.sleep") + @patch("self_tdd.watchdog._run_tests") + @patch("self_tdd.watchdog.typer") + def test_watch_first_pass(self, mock_typer, mock_tests, mock_sleep): + """First iteration: None→passing → should print green message.""" + call_count = 0 + + def side_effect(): + nonlocal call_count + call_count += 1 + if call_count >= 2: + raise KeyboardInterrupt + return (True, "all good") + + mock_tests.side_effect = side_effect + watch(interval=10) + # Should have printed green "All tests passing" message + mock_typer.secho.assert_called() + + @patch("self_tdd.watchdog.time.sleep") + @patch("self_tdd.watchdog._run_tests") + @patch("self_tdd.watchdog.typer") + def test_watch_regression(self, mock_typer, mock_tests, mock_sleep): + """Regression: passing→failing → should print red message + output.""" + results = [(True, "ok"), (False, "FAILED: test_foo"), KeyboardInterrupt] + idx = 0 + + def side_effect(): + nonlocal idx + if idx >= len(results): + raise KeyboardInterrupt + r = results[idx] + idx += 1 + if isinstance(r, type) and issubclass(r, BaseException): + raise r() + return r + + mock_tests.side_effect = side_effect + watch(interval=5) + # Should have printed red "Regression detected" at some point + secho_calls = [str(c) for c in mock_typer.secho.call_args_list] + assert any("Regression" in c for c in secho_calls) or any("RED" in c for c in secho_calls) + + @patch("self_tdd.watchdog.time.sleep") + @patch("self_tdd.watchdog._run_tests") + @patch("self_tdd.watchdog.typer") + def test_watch_keyboard_interrupt(self, mock_typer, mock_tests, mock_sleep): + mock_tests.side_effect = KeyboardInterrupt + watch(interval=60) + mock_typer.echo.assert_called() # "Watchdog stopped"