cleanup: delete dead modules — ~7,900 lines removed

Closes #22, Closes #23

Deleted: brain/, swarm/, openfang/, paperclip/, cascade_adapter,
memory_migrate, agents/timmy.py, dead routes + all corresponding tests.

Updated pyproject.toml, app.py, loop_qa.py for removed imports.
This commit is contained in:
2026-03-14 09:49:24 -04:00
parent dc13b368a5
commit 2f623826bd
56 changed files with 1254 additions and 8182 deletions

View File

@@ -1 +0,0 @@

View File

@@ -1,292 +0,0 @@
"""Tests for brain.client — BrainClient memory + task operations."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from brain.client import DEFAULT_RQLITE_URL, BrainClient
class TestBrainClientInit:
"""Test BrainClient initialization."""
def test_default_url(self):
client = BrainClient()
assert client.rqlite_url == DEFAULT_RQLITE_URL
def test_custom_url(self):
client = BrainClient(rqlite_url="http://custom:4001")
assert client.rqlite_url == "http://custom:4001"
def test_node_id_generated(self):
client = BrainClient()
assert client.node_id # not empty
def test_custom_node_id(self):
client = BrainClient(node_id="my-node")
assert client.node_id == "my-node"
def test_source_detection(self):
client = BrainClient()
assert isinstance(client.source, str)
class TestBrainClientMemory:
"""Test memory operations (remember, recall, get_recent, get_context)."""
def _make_client(self):
return BrainClient(rqlite_url="http://test:4001", node_id="test-node")
async def test_remember_success(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {"results": [{"last_insert_id": 42}]}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
with patch("brain.client.BrainClient._detect_source", return_value="test"):
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
result = await client.remember("test memory", tags=["test"])
assert result["id"] == 42
assert result["status"] == "stored"
async def test_remember_failure_raises(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("connection refused"))
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
with pytest.raises(Exception, match="connection refused"):
await client.remember("fail")
async def test_recall_success(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [
{
"rows": [
["memory content", "test", '{"key": "val"}', 0.1],
]
}
]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
results = await client.recall("search query")
assert len(results) == 1
assert results[0]["content"] == "memory content"
assert results[0]["metadata"] == {"key": "val"}
async def test_recall_with_source_filter(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {"results": [{"rows": []}]}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
results = await client.recall("test", sources=["timmy", "user"])
assert results == []
# Check that sources were passed in the SQL
call_args = client._client.post.call_args
sql_params = call_args[1]["json"]
assert "timmy" in sql_params[1] or "timmy" in str(sql_params)
async def test_recall_error_returns_empty(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("timeout"))
with patch("brain.embeddings.get_embedder") as mock_emb:
mock_embedder = MagicMock()
mock_embedder.encode_single.return_value = b"\x00" * 16
mock_emb.return_value = mock_embedder
results = await client.recall("test")
assert results == []
async def test_get_recent_success(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [
{
"rows": [
[1, "recent memory", "test", '["tag1"]', "{}", "2026-03-06T00:00:00"],
]
}
]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
memories = await client.get_recent(hours=24, limit=10)
assert len(memories) == 1
assert memories[0]["content"] == "recent memory"
assert memories[0]["tags"] == ["tag1"]
async def test_get_recent_error_returns_empty(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("db error"))
result = await client.get_recent()
assert result == []
async def test_get_context(self):
client = self._make_client()
client.get_recent = AsyncMock(
return_value=[
{"content": "Recent item 1"},
{"content": "Recent item 2"},
]
)
client.recall = AsyncMock(
return_value=[
{"content": "Relevant item 1"},
]
)
ctx = await client.get_context("test query")
assert "Recent activity:" in ctx
assert "Recent item 1" in ctx
assert "Relevant memories:" in ctx
assert "Relevant item 1" in ctx
class TestBrainClientTasks:
"""Test task queue operations."""
def _make_client(self):
return BrainClient(rqlite_url="http://test:4001", node_id="test-node")
async def test_submit_task(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {"results": [{"last_insert_id": 7}]}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
result = await client.submit_task("do something", task_type="shell")
assert result["id"] == 7
assert result["status"] == "queued"
async def test_submit_task_failure_raises(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("network error"))
with pytest.raises(Exception, match="network error"):
await client.submit_task("fail task")
async def test_claim_task_found(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [{"rows": [[1, "task content", "shell", 5, '{"key": "val"}']]}]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
task = await client.claim_task(["shell", "general"])
assert task is not None
assert task["id"] == 1
assert task["content"] == "task content"
assert task["metadata"] == {"key": "val"}
async def test_claim_task_none_available(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {"results": [{"rows": []}]}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
task = await client.claim_task(["shell"])
assert task is None
async def test_claim_task_error_returns_none(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("raft error"))
task = await client.claim_task(["general"])
assert task is None
async def test_complete_task(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock()
# Should not raise
await client.complete_task(1, success=True, result="done")
client._client.post.assert_awaited_once()
async def test_complete_task_failure(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock()
await client.complete_task(1, success=False, error="oops")
client._client.post.assert_awaited_once()
async def test_get_pending_tasks(self):
client = self._make_client()
mock_response = MagicMock()
mock_response.json.return_value = {
"results": [
{
"rows": [
[1, "task 1", "general", 0, "{}", "2026-03-06"],
[2, "task 2", "shell", 5, "{}", "2026-03-06"],
]
}
]
}
mock_response.raise_for_status = MagicMock()
client._client = MagicMock()
client._client.post = AsyncMock(return_value=mock_response)
tasks = await client.get_pending_tasks()
assert len(tasks) == 2
async def test_get_pending_tasks_error(self):
client = self._make_client()
client._client = MagicMock()
client._client.post = AsyncMock(side_effect=Exception("fail"))
result = await client.get_pending_tasks()
assert result == []
async def test_close(self):
client = self._make_client()
client._client = MagicMock()
client._client.aclose = AsyncMock()
await client.close()
client._client.aclose.assert_awaited_once()

View File

@@ -1,243 +0,0 @@
"""Tests for brain.worker — DistributedWorker capability detection + task execution."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from brain.worker import DistributedWorker
class TestWorkerInit:
"""Test worker initialization and capability detection."""
@patch("brain.worker.DistributedWorker._detect_capabilities")
def test_init_defaults(self, mock_caps):
mock_caps.return_value = ["general"]
worker = DistributedWorker()
assert worker.running is False
assert worker.node_id # non-empty
assert "general" in worker.capabilities
@patch("brain.worker.DistributedWorker._detect_capabilities")
def test_custom_brain_client(self, mock_caps):
mock_caps.return_value = ["general"]
mock_client = MagicMock()
worker = DistributedWorker(brain_client=mock_client)
assert worker.brain is mock_client
@patch("brain.worker.DistributedWorker._detect_capabilities")
def test_default_handlers_registered(self, mock_caps):
mock_caps.return_value = ["general"]
worker = DistributedWorker()
assert "shell" in worker._handlers
assert "creative" in worker._handlers
assert "code" in worker._handlers
assert "research" in worker._handlers
assert "general" in worker._handlers
class TestCapabilityDetection:
"""Test individual capability detection methods."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
return DistributedWorker()
@patch("brain.worker.subprocess.run")
def test_has_gpu_nvidia(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=0)
assert worker._has_gpu() is True
@patch("brain.worker.subprocess.run", side_effect=OSError("no nvidia-smi"))
@patch("brain.worker.os.path.exists", return_value=False)
@patch("brain.worker.os.uname")
def test_has_gpu_no_gpu(self, mock_uname, mock_exists, mock_run):
worker = self._make_worker()
mock_uname.return_value = MagicMock(sysname="Linux")
assert worker._has_gpu() is False
@patch("brain.worker.subprocess.run")
def test_has_internet_true(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=0)
assert worker._has_internet() is True
@patch("brain.worker.subprocess.run", side_effect=OSError("no curl"))
def test_has_internet_no_curl(self, mock_run):
worker = self._make_worker()
assert worker._has_internet() is False
@patch("brain.worker.subprocess.run")
def test_has_command_true(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=0)
assert worker._has_command("docker") is True
@patch("brain.worker.subprocess.run")
def test_has_command_false(self, mock_run):
worker = self._make_worker()
mock_run.return_value = MagicMock(returncode=1)
assert worker._has_command("nonexistent") is False
@patch("brain.worker.subprocess.run", side_effect=OSError)
def test_has_command_oserror(self, mock_run):
worker = self._make_worker()
assert worker._has_command("anything") is False
class TestRegisterHandler:
"""Test custom handler registration."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def test_register_adds_handler_and_capability(self, mock_caps):
worker = DistributedWorker()
async def custom_handler(content):
return "custom result"
worker.register_handler("custom_type", custom_handler)
assert "custom_type" in worker._handlers
assert "custom_type" in worker.capabilities
class TestTaskHandlers:
"""Test individual task handlers."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
worker = DistributedWorker()
worker.brain = MagicMock()
worker.brain.remember = AsyncMock()
worker.brain.complete_task = AsyncMock()
return worker
async def test_handle_code(self):
worker = self._make_worker()
result = await worker._handle_code("write a function")
assert "write a function" in result
async def test_handle_research_no_internet(self):
worker = self._make_worker()
worker.capabilities = ["general"] # no "web"
with pytest.raises(Exception, match="Internet not available"):
await worker._handle_research("search query")
async def test_handle_creative_no_gpu(self):
worker = self._make_worker()
worker.capabilities = ["general"] # no "gpu"
with pytest.raises(Exception, match="GPU not available"):
await worker._handle_creative("make an image")
async def test_handle_general_no_ollama(self):
worker = self._make_worker()
worker.capabilities = ["general"] # but not "ollama"
# Remove "ollama" if present
if "ollama" in worker.capabilities:
worker.capabilities.remove("ollama")
with pytest.raises(Exception, match="Ollama not available"):
await worker._handle_general("answer this")
class TestExecuteTask:
"""Test execute_task orchestration."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
worker = DistributedWorker()
worker.brain = MagicMock()
worker.brain.complete_task = AsyncMock()
return worker
async def test_execute_task_success(self):
worker = self._make_worker()
async def fake_handler(content):
return "result"
worker._handlers["test_type"] = fake_handler
result = await worker.execute_task(
{
"id": 1,
"type": "test_type",
"content": "do it",
}
)
assert result["success"] is True
assert result["result"] == "result"
worker.brain.complete_task.assert_awaited_once_with(1, success=True, result="result")
async def test_execute_task_failure(self):
worker = self._make_worker()
async def failing_handler(content):
raise RuntimeError("oops")
worker._handlers["fail_type"] = failing_handler
result = await worker.execute_task(
{
"id": 2,
"type": "fail_type",
"content": "fail",
}
)
assert result["success"] is False
assert "oops" in result["error"]
worker.brain.complete_task.assert_awaited_once()
async def test_execute_task_falls_back_to_general(self):
worker = self._make_worker()
async def general_handler(content):
return "general result"
worker._handlers["general"] = general_handler
result = await worker.execute_task(
{
"id": 3,
"type": "unknown_type",
"content": "something",
}
)
assert result["success"] is True
assert result["result"] == "general result"
class TestRunOnce:
"""Test run_once loop iteration."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def _make_worker(self, mock_caps):
worker = DistributedWorker()
worker.brain = MagicMock()
worker.brain.claim_task = AsyncMock()
worker.brain.complete_task = AsyncMock()
return worker
async def test_run_once_no_tasks(self):
worker = self._make_worker()
worker.brain.claim_task.return_value = None
had_work = await worker.run_once()
assert had_work is False
async def test_run_once_with_task(self):
worker = self._make_worker()
worker.brain.claim_task.return_value = {"id": 1, "type": "code", "content": "write code"}
had_work = await worker.run_once()
assert had_work is True
class TestStopWorker:
"""Test stop method."""
@patch("brain.worker.DistributedWorker._detect_capabilities", return_value=["general"])
def test_stop_sets_running_false(self, mock_caps):
worker = DistributedWorker()
worker.running = True
worker.stop()
assert worker.running is False

View File

@@ -1,416 +0,0 @@
"""Tests for brain.memory — Unified Memory interface.
Tests the local SQLite backend (default). rqlite tests are integration-only.
TDD: These tests define the contract that UnifiedMemory must fulfill.
Any substrate that reads/writes memory goes through this interface.
"""
from __future__ import annotations
import json
import pytest
from brain.memory import UnifiedMemory, get_memory
@pytest.fixture
def memory(tmp_path):
"""Create a UnifiedMemory instance with a temp database."""
db_path = tmp_path / "test_brain.db"
return UnifiedMemory(db_path=db_path, source="test", use_rqlite=False)
# ── Initialization ────────────────────────────────────────────────────────────
class TestUnifiedMemoryInit:
"""Validate database initialization and schema."""
def test_creates_database_file(self, tmp_path):
"""Database file should be created on init."""
db_path = tmp_path / "test.db"
assert not db_path.exists()
UnifiedMemory(db_path=db_path, use_rqlite=False)
assert db_path.exists()
def test_creates_parent_directories(self, tmp_path):
"""Should create parent dirs if they don't exist."""
db_path = tmp_path / "deep" / "nested" / "brain.db"
UnifiedMemory(db_path=db_path, use_rqlite=False)
assert db_path.exists()
def test_schema_has_memories_table(self, memory):
"""Schema should include memories table."""
conn = memory._get_conn()
try:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='memories'"
)
assert cursor.fetchone() is not None
finally:
conn.close()
def test_schema_has_facts_table(self, memory):
"""Schema should include facts table."""
conn = memory._get_conn()
try:
cursor = conn.execute(
"SELECT name FROM sqlite_master WHERE type='table' AND name='facts'"
)
assert cursor.fetchone() is not None
finally:
conn.close()
def test_schema_version_recorded(self, memory):
"""Schema version should be recorded."""
conn = memory._get_conn()
try:
cursor = conn.execute("SELECT version FROM brain_schema_version")
row = cursor.fetchone()
assert row is not None
assert row["version"] == 1
finally:
conn.close()
def test_idempotent_init(self, tmp_path):
"""Initializing twice on the same DB should not error."""
db_path = tmp_path / "test.db"
m1 = UnifiedMemory(db_path=db_path, use_rqlite=False)
m1.remember_sync("first memory")
m2 = UnifiedMemory(db_path=db_path, use_rqlite=False)
# Should not lose data
results = m2.recall_sync("first")
assert len(results) >= 1
def test_wal_mode_enabled(self, memory):
"""Database should use WAL journal mode for concurrency."""
conn = memory._get_conn()
try:
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal", f"Expected WAL mode, got {mode}"
finally:
conn.close()
def test_busy_timeout_set(self, memory):
"""Database connections should have busy_timeout configured."""
conn = memory._get_conn()
try:
timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
assert timeout == 5000, f"Expected 5000ms busy_timeout, got {timeout}"
finally:
conn.close()
# ── Remember (Sync) ──────────────────────────────────────────────────────────
class TestRememberSync:
"""Test synchronous memory storage."""
def test_remember_returns_id(self, memory):
"""remember_sync should return dict with id and status."""
result = memory.remember_sync("User prefers dark mode")
assert "id" in result
assert result["status"] == "stored"
assert result["id"] is not None
def test_remember_stores_content(self, memory):
"""Stored content should be retrievable."""
memory.remember_sync("The sky is blue")
results = memory.recall_sync("sky")
assert len(results) >= 1
assert "sky" in results[0]["content"].lower()
def test_remember_with_tags(self, memory):
"""Tags should be stored and retrievable."""
memory.remember_sync("Dark mode enabled", tags=["preference", "ui"])
conn = memory._get_conn()
try:
row = conn.execute(
"SELECT tags FROM memories WHERE content = ?", ("Dark mode enabled",)
).fetchone()
tags = json.loads(row["tags"])
assert "preference" in tags
assert "ui" in tags
finally:
conn.close()
def test_remember_with_metadata(self, memory):
"""Metadata should be stored as JSON."""
memory.remember_sync("Test", metadata={"key": "value", "count": 42})
conn = memory._get_conn()
try:
row = conn.execute("SELECT metadata FROM memories WHERE content = 'Test'").fetchone()
meta = json.loads(row["metadata"])
assert meta["key"] == "value"
assert meta["count"] == 42
finally:
conn.close()
def test_remember_with_custom_source(self, memory):
"""Source should default to self.source but be overridable."""
memory.remember_sync("From timmy", source="timmy")
memory.remember_sync("From user", source="user")
conn = memory._get_conn()
try:
rows = conn.execute("SELECT source FROM memories ORDER BY id").fetchall()
sources = [r["source"] for r in rows]
assert "timmy" in sources
assert "user" in sources
finally:
conn.close()
def test_remember_default_source(self, memory):
"""Default source should be the one set at init."""
memory.remember_sync("Default source test")
conn = memory._get_conn()
try:
row = conn.execute("SELECT source FROM memories").fetchone()
assert row["source"] == "test" # set in fixture
finally:
conn.close()
def test_remember_multiple(self, memory):
"""Multiple memories should be stored independently."""
for i in range(5):
memory.remember_sync(f"Memory number {i}")
conn = memory._get_conn()
try:
count = conn.execute("SELECT COUNT(*) FROM memories").fetchone()[0]
assert count == 5
finally:
conn.close()
# ── Recall (Sync) ─────────────────────────────────────────────────────────────
class TestRecallSync:
"""Test synchronous memory recall (keyword fallback)."""
def test_recall_finds_matching(self, memory):
"""Recall should find memories matching the query."""
memory.remember_sync("Bitcoin price is rising")
memory.remember_sync("Weather is sunny today")
results = memory.recall_sync("Bitcoin")
assert len(results) >= 1
assert "Bitcoin" in results[0]["content"]
def test_recall_low_score_for_irrelevant(self, memory):
"""Recall should return low scores for irrelevant queries.
Note: Semantic search may still return results (embeddings always
have *some* similarity), but scores should be low for unrelated content.
Keyword fallback returns nothing if no substring match.
"""
memory.remember_sync("Bitcoin price is rising fast")
results = memory.recall_sync("underwater basket weaving")
if results:
# If semantic search returned something, score should be low
assert results[0]["score"] < 0.7, (
f"Expected low score for irrelevant query, got {results[0]['score']}"
)
def test_recall_respects_limit(self, memory):
"""Recall should respect the limit parameter."""
for i in range(10):
memory.remember_sync(f"Bitcoin memory {i}")
results = memory.recall_sync("Bitcoin", limit=3)
assert len(results) <= 3
def test_recall_filters_by_source(self, memory):
"""Recall should filter by source when specified."""
memory.remember_sync("From timmy", source="timmy")
memory.remember_sync("From user about timmy", source="user")
results = memory.recall_sync("timmy", sources=["user"])
assert all(r["source"] == "user" for r in results)
def test_recall_returns_score(self, memory):
"""Recall results should include a score."""
memory.remember_sync("Test memory for scoring")
results = memory.recall_sync("Test")
assert len(results) >= 1
assert "score" in results[0]
# ── Facts ─────────────────────────────────────────────────────────────────────
class TestFacts:
"""Test long-term fact storage."""
def test_store_fact_returns_id(self, memory):
"""store_fact_sync should return dict with id and status."""
result = memory.store_fact_sync("user_preference", "Prefers dark mode")
assert "id" in result
assert result["status"] == "stored"
def test_get_facts_by_category(self, memory):
"""get_facts_sync should filter by category."""
memory.store_fact_sync("user_preference", "Likes dark mode")
memory.store_fact_sync("user_fact", "Lives in Texas")
prefs = memory.get_facts_sync(category="user_preference")
assert len(prefs) == 1
assert "dark mode" in prefs[0]["content"]
def test_get_facts_by_query(self, memory):
"""get_facts_sync should support keyword search."""
memory.store_fact_sync("user_preference", "Likes dark mode")
memory.store_fact_sync("user_preference", "Prefers Bitcoin")
results = memory.get_facts_sync(query="Bitcoin")
assert len(results) == 1
assert "Bitcoin" in results[0]["content"]
def test_fact_access_count_increments(self, memory):
"""Accessing a fact should increment its access_count."""
memory.store_fact_sync("test_cat", "Test fact")
# First access — count starts at 0, then gets incremented
facts = memory.get_facts_sync(category="test_cat")
first_count = facts[0]["access_count"]
# Second access — count should be higher
facts = memory.get_facts_sync(category="test_cat")
second_count = facts[0]["access_count"]
assert second_count > first_count, (
f"Access count should increment: {first_count} -> {second_count}"
)
def test_fact_confidence_ordering(self, memory):
"""Facts should be ordered by confidence (highest first)."""
memory.store_fact_sync("cat", "Low confidence fact", confidence=0.3)
memory.store_fact_sync("cat", "High confidence fact", confidence=0.9)
facts = memory.get_facts_sync(category="cat")
assert facts[0]["confidence"] > facts[1]["confidence"]
# ── Recent Memories ───────────────────────────────────────────────────────────
class TestRecentSync:
"""Test recent memory retrieval."""
def test_get_recent_returns_recent(self, memory):
"""get_recent_sync should return recently stored memories."""
memory.remember_sync("Just happened")
results = memory.get_recent_sync(hours=1, limit=10)
assert len(results) >= 1
assert "Just happened" in results[0]["content"]
def test_get_recent_respects_limit(self, memory):
"""get_recent_sync should respect limit."""
for i in range(10):
memory.remember_sync(f"Recent {i}")
results = memory.get_recent_sync(hours=1, limit=3)
assert len(results) <= 3
def test_get_recent_filters_by_source(self, memory):
"""get_recent_sync should filter by source."""
memory.remember_sync("From timmy", source="timmy")
memory.remember_sync("From user", source="user")
results = memory.get_recent_sync(hours=1, sources=["timmy"])
assert all(r["source"] == "timmy" for r in results)
# ── Stats ─────────────────────────────────────────────────────────────────────
class TestStats:
"""Test memory statistics."""
def test_stats_returns_counts(self, memory):
"""get_stats should return correct counts."""
memory.remember_sync("Memory 1")
memory.remember_sync("Memory 2")
memory.store_fact_sync("cat", "Fact 1")
stats = memory.get_stats()
assert stats["memory_count"] == 2
assert stats["fact_count"] == 1
assert stats["backend"] == "local_sqlite"
def test_stats_empty_db(self, memory):
"""get_stats should work on empty database."""
stats = memory.get_stats()
assert stats["memory_count"] == 0
assert stats["fact_count"] == 0
# ── Identity Integration ─────────────────────────────────────────────────────
class TestIdentityIntegration:
"""Identity system removed — stubs return empty strings."""
def test_get_identity_returns_empty(self, memory):
assert memory.get_identity() == ""
def test_get_identity_for_prompt_returns_empty(self, memory):
assert memory.get_identity_for_prompt() == ""
# ── Singleton ─────────────────────────────────────────────────────────────────
class TestSingleton:
"""Test the module-level get_memory() singleton."""
def test_get_memory_returns_instance(self):
"""get_memory() should return a UnifiedMemory instance."""
import brain.memory as mem_module
# Reset singleton for test isolation
mem_module._default_memory = None
m = get_memory()
assert isinstance(m, UnifiedMemory)
def test_get_memory_returns_same_instance(self):
"""get_memory() should return the same instance on repeated calls."""
import brain.memory as mem_module
mem_module._default_memory = None
m1 = get_memory()
m2 = get_memory()
assert m1 is m2
# ── Async Interface ───────────────────────────────────────────────────────────
@pytest.mark.asyncio
class TestAsyncInterface:
"""Test async wrappers (which delegate to sync for local SQLite)."""
async def test_async_remember(self, memory):
"""Async remember should work."""
result = await memory.remember("Async memory test")
assert result["status"] == "stored"
async def test_async_recall(self, memory):
"""Async recall should work."""
await memory.remember("Async recall target")
results = await memory.recall("Async recall")
assert len(results) >= 1
async def test_async_store_fact(self, memory):
"""Async store_fact should work."""
result = await memory.store_fact("test", "Async fact")
assert result["status"] == "stored"
async def test_async_get_facts(self, memory):
"""Async get_facts should work."""
await memory.store_fact("test", "Async fact retrieval")
facts = await memory.get_facts(category="test")
assert len(facts) >= 1
async def test_async_get_recent(self, memory):
"""Async get_recent should work."""
await memory.remember("Recent async memory")
results = await memory.get_recent(hours=1)
assert len(results) >= 1
async def test_async_get_context(self, memory):
"""Async get_context should return formatted context."""
await memory.remember("Context test memory")
context = await memory.get_context("test")
assert isinstance(context, str)
assert len(context) > 0

View File

@@ -20,6 +20,8 @@ except ImportError:
for _mod in [
"airllm",
"mcp",
"mcp.client",
"mcp.client.stdio",
"mcp.registry",
"telegram",
"telegram.ext",

View File

@@ -1,145 +0,0 @@
"""Tests for the Paperclip API routes."""
from unittest.mock import AsyncMock, MagicMock, patch
# ── GET /api/paperclip/status ────────────────────────────────────────────────
def test_status_disabled(client):
"""When paperclip_enabled is False, status returns disabled."""
response = client.get("/api/paperclip/status")
assert response.status_code == 200
data = response.json()
assert data["enabled"] is False
def test_status_enabled(client):
mock_status = MagicMock()
mock_status.model_dump.return_value = {
"enabled": True,
"connected": True,
"paperclip_url": "http://vps:3100",
"company_id": "comp-1",
"agent_count": 3,
"issue_count": 5,
"error": None,
}
mock_bridge = MagicMock()
mock_bridge.get_status = AsyncMock(return_value=mock_status)
with patch("dashboard.routes.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = True
with patch.dict("sys.modules", {}):
with patch("integrations.paperclip.bridge.bridge", mock_bridge):
response = client.get("/api/paperclip/status")
assert response.status_code == 200
assert response.json()["connected"] is True
# ── GET /api/paperclip/issues ────────────────────────────────────────────────
def test_list_issues_disabled(client):
response = client.get("/api/paperclip/issues")
assert response.status_code == 200
assert response.json()["enabled"] is False
# ── POST /api/paperclip/issues ───────────────────────────────────────────────
def test_create_issue_disabled(client):
response = client.post(
"/api/paperclip/issues",
json={"title": "Test"},
)
assert response.status_code == 200
assert response.json()["enabled"] is False
def test_create_issue_missing_title(client):
with patch("dashboard.routes.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = True
response = client.post(
"/api/paperclip/issues",
json={"description": "No title"},
)
assert response.status_code == 400
assert "title" in response.json()["error"]
# ── POST /api/paperclip/issues/{id}/delegate ─────────────────────────────────
def test_delegate_issue_missing_agent_id(client):
with patch("dashboard.routes.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = True
response = client.post(
"/api/paperclip/issues/i1/delegate",
json={"message": "Do this"},
)
assert response.status_code == 400
assert "agent_id" in response.json()["error"]
# ── POST /api/paperclip/issues/{id}/comment ──────────────────────────────────
def test_add_comment_missing_content(client):
with patch("dashboard.routes.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = True
response = client.post(
"/api/paperclip/issues/i1/comment",
json={},
)
assert response.status_code == 400
assert "content" in response.json()["error"]
# ── GET /api/paperclip/agents ────────────────────────────────────────────────
def test_list_agents_disabled(client):
response = client.get("/api/paperclip/agents")
assert response.status_code == 200
assert response.json()["enabled"] is False
# ── GET /api/paperclip/goals ─────────────────────────────────────────────────
def test_list_goals_disabled(client):
response = client.get("/api/paperclip/goals")
assert response.status_code == 200
assert response.json()["enabled"] is False
# ── POST /api/paperclip/goals ────────────────────────────────────────────────
def test_create_goal_missing_title(client):
with patch("dashboard.routes.paperclip.settings") as mock_settings:
mock_settings.paperclip_enabled = True
response = client.post(
"/api/paperclip/goals",
json={"description": "No title"},
)
assert response.status_code == 400
assert "title" in response.json()["error"]
# ── GET /api/paperclip/approvals ─────────────────────────────────────────────
def test_list_approvals_disabled(client):
response = client.get("/api/paperclip/approvals")
assert response.status_code == 200
assert response.json()["enabled"] is False
# ── GET /api/paperclip/runs ──────────────────────────────────────────────────
def test_list_runs_disabled(client):
response = client.get("/api/paperclip/runs")
assert response.status_code == 200
assert response.json()["enabled"] is False

View File

@@ -94,23 +94,7 @@ def test_creative_page_returns_200(client):
# ---------------------------------------------------------------------------
def test_swarm_live_page_returns_200(client):
"""GET /swarm/live renders the live dashboard page."""
response = client.get("/swarm/live")
assert response.status_code == 200
def test_swarm_live_websocket_sends_initial_state(client):
"""WebSocket at /swarm/live sends initial_state on connect."""
with client.websocket_connect("/swarm/live") as ws:
data = ws.receive_json()
# First message should be initial_state with swarm data
assert data.get("type") == "initial_state", f"Unexpected WS message: {data}"
payload = data.get("data", {})
assert "agents" in payload
assert "tasks" in payload
assert "auctions" in payload
# Swarm live page tests removed — swarm module deleted.
# ---------------------------------------------------------------------------
@@ -262,9 +246,6 @@ def test_all_dashboard_pages_return_200(client):
"/tasks",
"/briefing",
"/thinking",
"/swarm/mission-control",
"/swarm/live",
"/swarm/events",
"/bugs",
"/tools",
"/lightning/ledger",

View File

@@ -1,66 +0,0 @@
"""Tests for swarm.event_log — WAL mode, basic operations, and EventBus bridge."""
import pytest
from swarm.event_log import EventType, _ensure_db, log_event
@pytest.fixture(autouse=True)
def tmp_event_db(tmp_path, monkeypatch):
"""Redirect event_log writes to a temp directory."""
db_path = tmp_path / "events.db"
monkeypatch.setattr("swarm.event_log.DB_PATH", db_path)
yield db_path
class TestEventLogWAL:
"""Verify WAL mode is enabled for the event log database."""
def test_event_db_uses_wal(self):
conn = _ensure_db()
try:
mode = conn.execute("PRAGMA journal_mode").fetchone()[0]
assert mode == "wal", f"Expected WAL mode, got {mode}"
finally:
conn.close()
def test_event_db_busy_timeout(self):
conn = _ensure_db()
try:
timeout = conn.execute("PRAGMA busy_timeout").fetchone()[0]
assert timeout == 5000
finally:
conn.close()
class TestEventLogBasics:
"""Basic event logging operations."""
def test_log_event_returns_entry(self):
entry = log_event(EventType.SYSTEM_INFO, source="test", data={"msg": "hello"})
assert entry.id
assert entry.event_type == EventType.SYSTEM_INFO
assert entry.source == "test"
def test_log_event_persists(self):
log_event(EventType.TASK_CREATED, source="test", task_id="t1")
from swarm.event_log import get_task_events
events = get_task_events("t1")
assert len(events) == 1
assert events[0].event_type == EventType.TASK_CREATED
def test_log_event_with_agent_id(self):
entry = log_event(
EventType.AGENT_JOINED,
source="test",
agent_id="forge",
data={"persona_id": "forge"},
)
assert entry.agent_id == "forge"
def test_log_event_data_roundtrip(self):
data = {"bid_sats": 42, "reason": "testing"}
entry = log_event(EventType.BID_SUBMITTED, source="test", data=data)
assert entry.data["bid_sats"] == 42
assert entry.data["reason"] == "testing"

View File

@@ -1,205 +0,0 @@
"""Tests for the Paperclip bridge (CEO orchestration logic)."""
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from integrations.paperclip.bridge import PaperclipBridge
from integrations.paperclip.client import PaperclipClient
from integrations.paperclip.models import PaperclipAgent, PaperclipGoal, PaperclipIssue
@pytest.fixture
def mock_client():
client = MagicMock(spec=PaperclipClient)
# Make all methods async
client.healthy = AsyncMock(return_value=True)
client.list_agents = AsyncMock(return_value=[])
client.list_issues = AsyncMock(return_value=[])
client.list_goals = AsyncMock(return_value=[])
client.list_approvals = AsyncMock(return_value=[])
client.list_heartbeat_runs = AsyncMock(return_value=[])
client.get_issue = AsyncMock(return_value=None)
client.get_org = AsyncMock(return_value=None)
client.create_issue = AsyncMock(return_value=None)
client.update_issue = AsyncMock(return_value=None)
client.add_comment = AsyncMock(return_value=None)
client.wake_agent = AsyncMock(return_value=None)
client.create_goal = AsyncMock(return_value=None)
client.approve = AsyncMock(return_value=None)
client.reject = AsyncMock(return_value=None)
client.cancel_run = AsyncMock(return_value=None)
client.list_comments = AsyncMock(return_value=[])
return client
@pytest.fixture
def bridge(mock_client):
return PaperclipBridge(client=mock_client)
# ── status ───────────────────────────────────────────────────────────────────
async def test_status_when_disabled(bridge):
with patch("integrations.paperclip.bridge.settings") as mock_settings:
mock_settings.paperclip_enabled = False
mock_settings.paperclip_url = "http://localhost:3100"
status = await bridge.get_status()
assert status.enabled is False
async def test_status_when_connected(bridge, mock_client):
mock_client.healthy.return_value = True
mock_client.list_agents.return_value = [
PaperclipAgent(id="a1", name="Codex"),
]
mock_client.list_issues.return_value = [
PaperclipIssue(id="i1", title="Bug"),
PaperclipIssue(id="i2", title="Feature"),
]
with patch("integrations.paperclip.bridge.settings") as mock_settings:
mock_settings.paperclip_enabled = True
mock_settings.paperclip_url = "http://vps:3100"
mock_settings.paperclip_company_id = "comp-1"
status = await bridge.get_status()
assert status.enabled is True
assert status.connected is True
assert status.agent_count == 1
assert status.issue_count == 2
async def test_status_when_disconnected(bridge, mock_client):
mock_client.healthy.return_value = False
with patch("integrations.paperclip.bridge.settings") as mock_settings:
mock_settings.paperclip_enabled = True
mock_settings.paperclip_url = "http://vps:3100"
mock_settings.paperclip_company_id = "comp-1"
status = await bridge.get_status()
assert status.enabled is True
assert status.connected is False
assert "Cannot reach" in status.error
# ── create and assign ────────────────────────────────────────────────────────
async def test_create_and_assign_with_wake(bridge, mock_client):
issue = PaperclipIssue(id="i1", title="Deploy v2")
mock_client.create_issue.return_value = issue
mock_client.wake_agent.return_value = {"status": "queued"}
result = await bridge.create_and_assign(
title="Deploy v2",
assignee_id="agent-codex",
wake=True,
)
assert result is not None
assert result.id == "i1"
mock_client.wake_agent.assert_awaited_once_with("agent-codex", issue_id="i1")
async def test_create_and_assign_no_wake(bridge, mock_client):
issue = PaperclipIssue(id="i2", title="Research task")
mock_client.create_issue.return_value = issue
result = await bridge.create_and_assign(
title="Research task",
assignee_id="agent-research",
wake=False,
)
assert result is not None
mock_client.wake_agent.assert_not_awaited()
async def test_create_and_assign_failure(bridge, mock_client):
mock_client.create_issue.return_value = None
result = await bridge.create_and_assign(title="Will fail")
assert result is None
# ── delegate ─────────────────────────────────────────────────────────────────
async def test_delegate_issue(bridge, mock_client):
mock_client.update_issue.return_value = PaperclipIssue(id="i1", title="Task")
mock_client.wake_agent.return_value = {"status": "queued"}
ok = await bridge.delegate_issue("i1", "agent-codex", message="Handle this")
assert ok is True
mock_client.add_comment.assert_awaited_once()
mock_client.wake_agent.assert_awaited_once()
async def test_delegate_issue_update_fails(bridge, mock_client):
mock_client.update_issue.return_value = None
ok = await bridge.delegate_issue("i1", "agent-codex")
assert ok is False
# ── close issue ──────────────────────────────────────────────────────────────
async def test_close_issue(bridge, mock_client):
mock_client.update_issue.return_value = PaperclipIssue(id="i1", title="Done")
ok = await bridge.close_issue("i1", comment="Shipped!")
assert ok is True
mock_client.add_comment.assert_awaited_once()
# ── goals ────────────────────────────────────────────────────────────────────
async def test_set_goal(bridge, mock_client):
mock_client.create_goal.return_value = PaperclipGoal(id="g1", title="Ship MVP")
goal = await bridge.set_goal("Ship MVP")
assert goal is not None
assert goal.title == "Ship MVP"
# ── approvals ────────────────────────────────────────────────────────────────
async def test_approve(bridge, mock_client):
mock_client.approve.return_value = {"status": "approved"}
ok = await bridge.approve("ap1")
assert ok is True
async def test_reject(bridge, mock_client):
mock_client.reject.return_value = {"status": "rejected"}
ok = await bridge.reject("ap1", comment="Needs work")
assert ok is True
async def test_approve_failure(bridge, mock_client):
mock_client.approve.return_value = None
ok = await bridge.approve("ap1")
assert ok is False
# ── runs ─────────────────────────────────────────────────────────────────────
async def test_active_runs(bridge, mock_client):
mock_client.list_heartbeat_runs.return_value = [
{"id": "r1", "status": "running"},
]
runs = await bridge.active_runs()
assert len(runs) == 1
async def test_cancel_run(bridge, mock_client):
mock_client.cancel_run.return_value = {"status": "cancelled"}
ok = await bridge.cancel_run("r1")
assert ok is True

View File

@@ -1,213 +0,0 @@
"""Tests for the Paperclip API client.
Uses httpx.MockTransport so every test exercises the real HTTP path
(_get/_post/_delete, status-code handling, JSON parsing, error paths)
instead of patching the transport methods away.
"""
import json
from unittest.mock import patch
import httpx
from integrations.paperclip.client import PaperclipClient
from integrations.paperclip.models import CreateIssueRequest
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _mock_transport(routes: dict[str, tuple[int, dict | list | None]]):
"""Build an httpx.MockTransport from a {method+path: (status, body)} map.
Example:
_mock_transport({
"GET /api/health": (200, {"status": "ok"}),
"DELETE /api/issues/i1": (204, None),
})
"""
def handler(request: httpx.Request) -> httpx.Response:
key = f"{request.method} {request.url.path}"
if key in routes:
status, body = routes[key]
content = json.dumps(body).encode() if body is not None else b""
return httpx.Response(
status, content=content, headers={"content-type": "application/json"}
)
return httpx.Response(404, json={"error": "not found"})
return httpx.MockTransport(handler)
def _client_with(routes: dict[str, tuple[int, dict | list | None]]) -> PaperclipClient:
"""Create a PaperclipClient whose internal httpx.AsyncClient uses a mock transport."""
client = PaperclipClient(base_url="http://fake:3100", api_key="test-key")
client._client = httpx.AsyncClient(
transport=_mock_transport(routes),
base_url="http://fake:3100",
headers={"Accept": "application/json", "Authorization": "Bearer test-key"},
)
return client
# ---------------------------------------------------------------------------
# health
# ---------------------------------------------------------------------------
async def test_healthy_returns_true_on_200():
client = _client_with({"GET /api/health": (200, {"status": "ok"})})
assert await client.healthy() is True
async def test_healthy_returns_false_on_500():
client = _client_with({"GET /api/health": (500, {"error": "down"})})
assert await client.healthy() is False
async def test_healthy_returns_false_on_404():
client = _client_with({}) # no routes → 404
assert await client.healthy() is False
# ---------------------------------------------------------------------------
# agents
# ---------------------------------------------------------------------------
async def test_list_agents_parses_response():
raw = [{"id": "a1", "name": "Codex", "role": "engineer", "status": "active"}]
client = _client_with({"GET /api/companies/comp-1/agents": (200, raw)})
agents = await client.list_agents(company_id="comp-1")
assert len(agents) == 1
assert agents[0].name == "Codex"
assert agents[0].id == "a1"
async def test_list_agents_empty_on_server_error():
client = _client_with({"GET /api/companies/comp-1/agents": (503, None)})
agents = await client.list_agents(company_id="comp-1")
assert agents == []
async def test_list_agents_graceful_on_404():
client = _client_with({})
agents = await client.list_agents(company_id="comp-1")
assert agents == []
# ---------------------------------------------------------------------------
# issues
# ---------------------------------------------------------------------------
async def test_list_issues():
raw = [{"id": "i1", "title": "Fix bug"}]
client = _client_with({"GET /api/companies/comp-1/issues": (200, raw)})
issues = await client.list_issues(company_id="comp-1")
assert len(issues) == 1
assert issues[0].title == "Fix bug"
async def test_get_issue():
raw = {"id": "i1", "title": "Fix bug", "description": "It's broken"}
client = _client_with({"GET /api/issues/i1": (200, raw)})
issue = await client.get_issue("i1")
assert issue is not None
assert issue.id == "i1"
async def test_get_issue_not_found():
client = _client_with({"GET /api/issues/nonexistent": (404, None)})
issue = await client.get_issue("nonexistent")
assert issue is None
async def test_create_issue():
raw = {"id": "i2", "title": "New feature"}
client = _client_with({"POST /api/companies/comp-1/issues": (201, raw)})
req = CreateIssueRequest(title="New feature")
issue = await client.create_issue(req, company_id="comp-1")
assert issue is not None
assert issue.id == "i2"
async def test_create_issue_no_company_id():
"""Missing company_id returns None without making any HTTP call."""
client = _client_with({})
with patch("integrations.paperclip.client.settings") as mock_settings:
mock_settings.paperclip_company_id = ""
issue = await client.create_issue(CreateIssueRequest(title="Test"))
assert issue is None
async def test_delete_issue_returns_true_on_success():
client = _client_with({"DELETE /api/issues/i1": (204, None)})
result = await client.delete_issue("i1")
assert result is True
async def test_delete_issue_returns_false_on_error():
client = _client_with({"DELETE /api/issues/i1": (500, None)})
result = await client.delete_issue("i1")
assert result is False
# ---------------------------------------------------------------------------
# comments
# ---------------------------------------------------------------------------
async def test_add_comment():
raw = {"id": "c1", "issue_id": "i1", "content": "Done"}
client = _client_with({"POST /api/issues/i1/comments": (201, raw)})
comment = await client.add_comment("i1", "Done")
assert comment is not None
assert comment.content == "Done"
async def test_list_comments():
raw = [{"id": "c1", "issue_id": "i1", "content": "LGTM"}]
client = _client_with({"GET /api/issues/i1/comments": (200, raw)})
comments = await client.list_comments("i1")
assert len(comments) == 1
# ---------------------------------------------------------------------------
# goals
# ---------------------------------------------------------------------------
async def test_list_goals():
raw = [{"id": "g1", "title": "Ship MVP"}]
client = _client_with({"GET /api/companies/comp-1/goals": (200, raw)})
goals = await client.list_goals(company_id="comp-1")
assert len(goals) == 1
assert goals[0].title == "Ship MVP"
async def test_create_goal():
raw = {"id": "g2", "title": "Scale to 1000 users"}
client = _client_with({"POST /api/companies/comp-1/goals": (201, raw)})
goal = await client.create_goal("Scale to 1000 users", company_id="comp-1")
assert goal is not None
# ---------------------------------------------------------------------------
# heartbeat runs
# ---------------------------------------------------------------------------
async def test_list_heartbeat_runs():
raw = [{"id": "r1", "agent_id": "a1", "status": "running"}]
client = _client_with({"GET /api/companies/comp-1/heartbeat-runs": (200, raw)})
runs = await client.list_heartbeat_runs(company_id="comp-1")
assert len(runs) == 1
async def test_list_heartbeat_runs_server_error():
client = _client_with({"GET /api/companies/comp-1/heartbeat-runs": (500, None)})
runs = await client.list_heartbeat_runs(company_id="comp-1")
assert runs == []

View File

@@ -1,936 +0,0 @@
"""Integration tests for the Paperclip task runner — full green-path workflow.
Tests the complete autonomous cycle with a StubOrchestrator that exercises
the real pipe (TaskRunner → orchestrator.execute_task → bridge → client)
while stubbing only the LLM intelligence layer.
Green path:
1. Timmy grabs first task in queue
2. Orchestrator.execute_task processes it (stub returns input-aware response)
3. Timmy posts completion comment and marks issue done
4. Timmy creates a recursive follow-up task for himself
The stub is deliberately input-aware — it echoes back task metadata so
assertions can prove data actually flowed through the pipe, not just that
methods were called.
Live-LLM tests (``@pytest.mark.ollama``) are at the bottom; they hit a real
tiny model via Ollama and are skipped when Ollama is not running.
Run them with: ``tox -e ollama`` or ``pytest -m ollama``
"""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from integrations.paperclip.bridge import PaperclipBridge
from integrations.paperclip.client import PaperclipClient
from integrations.paperclip.models import PaperclipIssue
from integrations.paperclip.task_runner import TaskRunner
# ── Constants ─────────────────────────────────────────────────────────────────
TIMMY_AGENT_ID = "agent-timmy"
COMPANY_ID = "comp-1"
# ── StubOrchestrator: exercises the pipe, stubs the intelligence ──────────────
class StubOrchestrator:
"""Deterministic orchestrator that proves data flows through the pipe.
Returns responses that reference input metadata — so tests can assert
the pipe actually connected (task_id, title, priority all appear in output).
Tracks every call for post-hoc inspection.
"""
def __init__(self) -> None:
self.calls: list[dict] = []
async def execute_task(self, task_id: str, description: str, context: dict) -> dict:
call_record = {
"task_id": task_id,
"description": description,
"context": dict(context),
}
self.calls.append(call_record)
title = context.get("title", description[:50])
priority = context.get("priority", "normal")
return {
"task_id": task_id,
"agent": "orchestrator",
"result": (
f"[Orchestrator] Processed '{title}'. "
f"Task {task_id} handled with priority {priority}. "
"Self-reflection: my task automation loop is functioning. "
"I should create a follow-up to review this pattern."
),
"status": "completed",
}
# ── Fixtures ──────────────────────────────────────────────────────────────────
@pytest.fixture
def stub_orchestrator():
return StubOrchestrator()
@pytest.fixture
def mock_client():
"""Fully stubbed PaperclipClient with async methods."""
client = MagicMock(spec=PaperclipClient)
client.healthy = AsyncMock(return_value=True)
client.list_issues = AsyncMock(return_value=[])
client.get_issue = AsyncMock(return_value=None)
client.create_issue = AsyncMock(return_value=None)
client.update_issue = AsyncMock(return_value=None)
client.delete_issue = AsyncMock(return_value=True)
client.add_comment = AsyncMock(return_value=None)
client.list_comments = AsyncMock(return_value=[])
client.checkout_issue = AsyncMock(return_value={"ok": True})
client.release_issue = AsyncMock(return_value={"ok": True})
client.wake_agent = AsyncMock(return_value=None)
client.list_agents = AsyncMock(return_value=[])
client.list_goals = AsyncMock(return_value=[])
client.create_goal = AsyncMock(return_value=None)
client.list_approvals = AsyncMock(return_value=[])
client.list_heartbeat_runs = AsyncMock(return_value=[])
client.cancel_run = AsyncMock(return_value=None)
client.approve = AsyncMock(return_value=None)
client.reject = AsyncMock(return_value=None)
return client
@pytest.fixture
def bridge(mock_client):
return PaperclipBridge(client=mock_client)
@pytest.fixture
def settings_patch():
"""Patch settings for all task runner tests."""
with (
patch("integrations.paperclip.task_runner.settings") as ts,
patch("integrations.paperclip.bridge.settings") as bs,
):
for s in (ts, bs):
s.paperclip_enabled = True
s.paperclip_agent_id = TIMMY_AGENT_ID
s.paperclip_company_id = COMPANY_ID
s.paperclip_url = "http://fake:3100"
s.paperclip_poll_interval = 0
yield ts
# ── Helpers ───────────────────────────────────────────────────────────────────
def _make_issue(
id: str = "issue-1",
title: str = "Muse about task automation",
description: str = "Reflect on how you handle tasks and write a recursive self-improvement task.",
status: str = "open",
assignee_id: str = TIMMY_AGENT_ID,
priority: str = "normal",
labels: list[str] | None = None,
) -> PaperclipIssue:
return PaperclipIssue(
id=id,
title=title,
description=description,
status=status,
assignee_id=assignee_id,
priority=priority,
labels=labels or [],
)
def _make_done(id: str = "issue-1", title: str = "Done") -> PaperclipIssue:
return PaperclipIssue(id=id, title=title, status="done")
def _make_follow_up(id: str = "issue-2") -> PaperclipIssue:
return PaperclipIssue(
id=id,
title="Follow-up: Muse about task automation",
description="Automated follow-up from completed task",
status="open",
assignee_id=TIMMY_AGENT_ID,
priority="normal",
)
# ═══════════════════════════════════════════════════════════════════════════════
# PIPE WIRING: verify orchestrator is actually connected
# ═══════════════════════════════════════════════════════════════════════════════
class TestOrchestratorWiring:
"""Verify the orchestrator parameter actually connects to the pipe."""
async def test_orchestrator_execute_task_is_called(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""When orchestrator is wired, process_task calls execute_task."""
issue = _make_issue()
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
await runner.process_task(issue)
assert len(stub_orchestrator.calls) == 1
call = stub_orchestrator.calls[0]
assert call["task_id"] == "issue-1"
assert call["context"]["title"] == "Muse about task automation"
async def test_orchestrator_receives_full_context(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""Context dict passed to execute_task includes all issue metadata."""
issue = _make_issue(
id="ctx-test",
title="Context verification",
priority="high",
labels=["automation", "meta"],
)
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
await runner.process_task(issue)
ctx = stub_orchestrator.calls[0]["context"]
assert ctx["issue_id"] == "ctx-test"
assert ctx["title"] == "Context verification"
assert ctx["priority"] == "high"
assert ctx["labels"] == ["automation", "meta"]
async def test_orchestrator_dict_result_unwrapped(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""When execute_task returns a dict, the 'result' key is extracted."""
issue = _make_issue()
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
result = await runner.process_task(issue)
# StubOrchestrator returns dict with "result" key
assert "[Orchestrator]" in result
assert "issue-1" in result
async def test_orchestrator_string_result_passthrough(
self,
mock_client,
bridge,
settings_patch,
):
"""When execute_task returns a plain string, it passes through."""
class StringOrchestrator:
async def execute_task(self, task_id, description, context):
return f"Plain string result for {task_id}"
runner = TaskRunner(bridge=bridge, orchestrator=StringOrchestrator())
result = await runner.process_task(_make_issue())
assert result == "Plain string result for issue-1"
async def test_process_fn_overrides_orchestrator(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""Explicit process_fn takes priority over orchestrator."""
async def override(task_id, desc, ctx):
return "override wins"
runner = TaskRunner(
bridge=bridge,
orchestrator=stub_orchestrator,
process_fn=override,
)
result = await runner.process_task(_make_issue())
assert result == "override wins"
assert len(stub_orchestrator.calls) == 0 # orchestrator NOT called
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 1: Timmy grabs the first task in queue
# ═══════════════════════════════════════════════════════════════════════════════
class TestGrabNextTask:
"""Verify Timmy picks the first open issue assigned to him."""
async def test_grabs_first_assigned_issue(self, mock_client, bridge, settings_patch):
issue = _make_issue()
mock_client.list_issues.return_value = [issue]
runner = TaskRunner(bridge=bridge)
grabbed = await runner.grab_next_task()
assert grabbed is not None
assert grabbed.id == "issue-1"
assert grabbed.assignee_id == TIMMY_AGENT_ID
mock_client.list_issues.assert_awaited_once_with(status="open")
async def test_skips_issues_not_assigned_to_timmy(self, mock_client, bridge, settings_patch):
other = _make_issue(id="other-1", assignee_id="agent-codex")
mine = _make_issue(id="timmy-1")
mock_client.list_issues.return_value = [other, mine]
runner = TaskRunner(bridge=bridge)
grabbed = await runner.grab_next_task()
assert grabbed.id == "timmy-1"
async def test_returns_none_when_queue_empty(self, mock_client, bridge, settings_patch):
mock_client.list_issues.return_value = []
runner = TaskRunner(bridge=bridge)
assert await runner.grab_next_task() is None
async def test_returns_none_when_no_agent_id(self, mock_client, bridge, settings_patch):
settings_patch.paperclip_agent_id = ""
runner = TaskRunner(bridge=bridge)
assert await runner.grab_next_task() is None
mock_client.list_issues.assert_not_awaited()
async def test_grabs_first_of_multiple(self, mock_client, bridge, settings_patch):
issues = [_make_issue(id=f"t-{i}", title=f"Task {i}") for i in range(3)]
mock_client.list_issues.return_value = issues
runner = TaskRunner(bridge=bridge)
assert (await runner.grab_next_task()).id == "t-0"
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 2: Timmy processes the task through the orchestrator
# ═══════════════════════════════════════════════════════════════════════════════
class TestProcessTask:
"""Verify checkout + orchestrator invocation + result flow."""
async def test_checkout_before_orchestrator(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""Issue must be checked out before orchestrator runs."""
issue = _make_issue()
checkout_happened = {"before_execute": False}
original_execute = stub_orchestrator.execute_task
async def tracking_execute(task_id, desc, ctx):
checkout_happened["before_execute"] = mock_client.checkout_issue.await_count > 0
return await original_execute(task_id, desc, ctx)
stub_orchestrator.execute_task = tracking_execute
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
await runner.process_task(issue)
assert checkout_happened["before_execute"], "checkout must happen before execute_task"
async def test_orchestrator_output_flows_to_result(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""The string returned by process_task comes from the orchestrator."""
issue = _make_issue(id="flow-1", title="Flow verification", priority="high")
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
result = await runner.process_task(issue)
# Verify orchestrator's output arrived — it references the input
assert "Flow verification" in result
assert "flow-1" in result
assert "high" in result
async def test_default_fallback_without_orchestrator(
self,
mock_client,
bridge,
settings_patch,
):
"""Without orchestrator or process_fn, a default message is returned."""
issue = _make_issue(title="Fallback test")
runner = TaskRunner(bridge=bridge) # no orchestrator
result = await runner.process_task(issue)
assert "Fallback test" in result
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 3: Timmy completes the task — comment + close
# ═══════════════════════════════════════════════════════════════════════════════
class TestCompleteTask:
"""Verify orchestrator output flows into the completion comment."""
async def test_orchestrator_output_in_comment(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""The comment posted to Paperclip contains the orchestrator's output."""
issue = _make_issue(id="cmt-1", title="Comment pipe test")
mock_client.update_issue.return_value = _make_done("cmt-1")
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
# Process to get orchestrator output
result = await runner.process_task(issue)
# Complete to post it as comment
await runner.complete_task(issue, result)
comment_content = mock_client.add_comment.call_args[0][1]
assert "[Timmy]" in comment_content
assert "[Orchestrator]" in comment_content
assert "Comment pipe test" in comment_content
async def test_marks_issue_done(
self,
mock_client,
bridge,
settings_patch,
):
issue = _make_issue()
mock_client.update_issue.return_value = _make_done()
runner = TaskRunner(bridge=bridge)
ok = await runner.complete_task(issue, "any result")
assert ok is True
update_req = mock_client.update_issue.call_args[0][1]
assert update_req.status == "done"
async def test_returns_false_on_close_failure(
self,
mock_client,
bridge,
settings_patch,
):
mock_client.update_issue.return_value = None
runner = TaskRunner(bridge=bridge)
assert await runner.complete_task(_make_issue(), "result") is False
# ═══════════════════════════════════════════════════════════════════════════════
# STEP 4: Follow-up creation with orchestrator output embedded
# ═══════════════════════════════════════════════════════════════════════════════
class TestCreateFollowUp:
"""Verify orchestrator output flows into the follow-up description."""
async def test_follow_up_contains_orchestrator_output(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""The follow-up description includes the orchestrator's result text."""
issue = _make_issue(id="fu-1", title="Follow-up pipe test")
mock_client.create_issue.return_value = _make_follow_up()
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
result = await runner.process_task(issue)
await runner.create_follow_up(issue, result)
create_req = mock_client.create_issue.call_args[0][0]
# Orchestrator output should be embedded in description
assert "[Orchestrator]" in create_req.description
assert "fu-1" in create_req.description
async def test_follow_up_assigned_to_self(
self,
mock_client,
bridge,
settings_patch,
):
mock_client.create_issue.return_value = _make_follow_up()
runner = TaskRunner(bridge=bridge)
await runner.create_follow_up(_make_issue(), "result")
req = mock_client.create_issue.call_args[0][0]
assert req.assignee_id == TIMMY_AGENT_ID
async def test_follow_up_preserves_priority(
self,
mock_client,
bridge,
settings_patch,
):
mock_client.create_issue.return_value = _make_follow_up()
runner = TaskRunner(bridge=bridge)
await runner.create_follow_up(_make_issue(priority="high"), "result")
req = mock_client.create_issue.call_args[0][0]
assert req.priority == "high"
async def test_follow_up_not_woken(self, mock_client, bridge, settings_patch):
mock_client.create_issue.return_value = _make_follow_up()
runner = TaskRunner(bridge=bridge)
await runner.create_follow_up(_make_issue(), "result")
mock_client.wake_agent.assert_not_awaited()
async def test_returns_none_on_failure(self, mock_client, bridge, settings_patch):
mock_client.create_issue.return_value = None
runner = TaskRunner(bridge=bridge)
assert await runner.create_follow_up(_make_issue(), "r") is None
# ═══════════════════════════════════════════════════════════════════════════════
# FULL GREEN PATH: orchestrator wired end-to-end
# ═══════════════════════════════════════════════════════════════════════════════
class TestGreenPathWithOrchestrator:
"""Full pipe: TaskRunner → StubOrchestrator → bridge → mock_client.
Proves orchestrator output propagates to every downstream artefact:
the comment, the follow-up description, and the summary dict.
"""
async def test_full_cycle_orchestrator_output_everywhere(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""Orchestrator result appears in comment, follow-up, and summary."""
original = _make_issue(
id="green-1",
title="Muse about task automation and write a recursive task",
description="Reflect on your task processing. Create a follow-up.",
priority="high",
)
mock_client.list_issues.return_value = [original]
mock_client.update_issue.return_value = _make_done("green-1")
mock_client.create_issue.return_value = _make_follow_up("green-fu")
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
summary = await runner.run_once()
# ── Orchestrator was called with correct data
assert len(stub_orchestrator.calls) == 1
call = stub_orchestrator.calls[0]
assert call["task_id"] == "green-1"
assert call["context"]["priority"] == "high"
assert "Reflect on your task processing" in call["description"]
# ── Summary contains orchestrator output
assert summary is not None
assert summary["original_issue_id"] == "green-1"
assert summary["completed"] is True
assert summary["follow_up_issue_id"] == "green-fu"
assert "[Orchestrator]" in summary["result"]
assert "green-1" in summary["result"]
# ── Comment posted contains orchestrator output
comment_content = mock_client.add_comment.call_args[0][1]
assert "[Timmy]" in comment_content
assert "[Orchestrator]" in comment_content
assert "high" in comment_content # priority flowed through
# ── Follow-up description contains orchestrator output
follow_up_req = mock_client.create_issue.call_args[0][0]
assert "[Orchestrator]" in follow_up_req.description
assert "green-1" in follow_up_req.description
assert follow_up_req.priority == "high"
assert follow_up_req.assignee_id == TIMMY_AGENT_ID
# ── Correct ordering of API calls
mock_client.list_issues.assert_awaited_once()
mock_client.checkout_issue.assert_awaited_once_with("green-1")
mock_client.add_comment.assert_awaited_once()
mock_client.update_issue.assert_awaited_once()
assert mock_client.create_issue.await_count == 1
async def test_no_tasks_returns_none(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
mock_client.list_issues.return_value = []
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
assert await runner.run_once() is None
assert len(stub_orchestrator.calls) == 0
async def test_close_failure_still_creates_follow_up(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
mock_client.list_issues.return_value = [_make_issue()]
mock_client.update_issue.return_value = None # close fails
mock_client.create_issue.return_value = _make_follow_up()
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
summary = await runner.run_once()
assert summary["completed"] is False
assert summary["follow_up_issue_id"] == "issue-2"
assert len(stub_orchestrator.calls) == 1
# ═══════════════════════════════════════════════════════════════════════════════
# EXTERNAL INJECTION: task from Paperclip API → orchestrator processes it
# ═══════════════════════════════════════════════════════════════════════════════
class TestExternalTaskInjection:
"""External system creates a task → Timmy's orchestrator processes it."""
async def test_external_task_flows_through_orchestrator(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
external = _make_issue(
id="ext-1",
title="Review quarterly metrics",
description="Analyze Q1 metrics and prepare summary.",
)
mock_client.list_issues.return_value = [external]
mock_client.update_issue.return_value = _make_done("ext-1")
mock_client.create_issue.return_value = _make_follow_up("ext-fu")
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
summary = await runner.run_once()
# Orchestrator received the external task
assert stub_orchestrator.calls[0]["task_id"] == "ext-1"
assert "Analyze Q1 metrics" in stub_orchestrator.calls[0]["description"]
# Its output flowed to Paperclip
assert "[Orchestrator]" in summary["result"]
assert "Review quarterly metrics" in summary["result"]
async def test_skips_tasks_for_other_agents(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
other = _make_issue(id="other-1", assignee_id="agent-codex")
mine = _make_issue(id="mine-1", title="My task")
mock_client.list_issues.return_value = [other, mine]
mock_client.update_issue.return_value = _make_done("mine-1")
mock_client.create_issue.return_value = _make_follow_up()
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
summary = await runner.run_once()
assert summary["original_issue_id"] == "mine-1"
mock_client.checkout_issue.assert_awaited_once_with("mine-1")
# ═══════════════════════════════════════════════════════════════════════════════
# RECURSIVE CHAIN: follow-up → grabbed → orchestrator → follow-up → ...
# ═══════════════════════════════════════════════════════════════════════════════
class TestRecursiveChain:
"""Multi-cycle chains where each follow-up becomes the next task."""
async def test_two_cycle_chain(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
task_a = _make_issue(id="A", title="Initial musing")
fu_b = PaperclipIssue(
id="B",
title="Follow-up: Initial musing",
description="Continue",
status="open",
assignee_id=TIMMY_AGENT_ID,
priority="normal",
)
fu_c = PaperclipIssue(
id="C",
title="Follow-up: Follow-up",
status="open",
assignee_id=TIMMY_AGENT_ID,
)
# Cycle 1
mock_client.list_issues.return_value = [task_a]
mock_client.update_issue.return_value = _make_done("A")
mock_client.create_issue.return_value = fu_b
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
s1 = await runner.run_once()
assert s1["original_issue_id"] == "A"
assert s1["follow_up_issue_id"] == "B"
# Cycle 2: follow-up B is now the task
mock_client.list_issues.return_value = [fu_b]
mock_client.update_issue.return_value = _make_done("B")
mock_client.create_issue.return_value = fu_c
s2 = await runner.run_once()
assert s2["original_issue_id"] == "B"
assert s2["follow_up_issue_id"] == "C"
# Orchestrator was called twice — once per cycle
assert len(stub_orchestrator.calls) == 2
assert stub_orchestrator.calls[0]["task_id"] == "A"
assert stub_orchestrator.calls[1]["task_id"] == "B"
async def test_three_cycle_chain_all_through_orchestrator(
self,
mock_client,
bridge,
stub_orchestrator,
settings_patch,
):
"""Three cycles — every task goes through the orchestrator pipe."""
tasks = [_make_issue(id=f"c-{i}", title=f"Chain {i}") for i in range(3)]
follow_ups = [
PaperclipIssue(
id=f"c-{i + 1}",
title=f"Follow-up: Chain {i}",
status="open",
assignee_id=TIMMY_AGENT_ID,
)
for i in range(3)
]
runner = TaskRunner(bridge=bridge, orchestrator=stub_orchestrator)
ids = []
for i in range(3):
mock_client.list_issues.return_value = [tasks[i]]
mock_client.update_issue.return_value = _make_done(tasks[i].id)
mock_client.create_issue.return_value = follow_ups[i]
s = await runner.run_once()
ids.append(s["original_issue_id"])
assert ids == ["c-0", "c-1", "c-2"]
assert len(stub_orchestrator.calls) == 3
# ═══════════════════════════════════════════════════════════════════════════════
# LIFECYCLE: start/stop
# ═══════════════════════════════════════════════════════════════════════════════
class TestLifecycle:
async def test_stop_halts_loop(self, mock_client, bridge, settings_patch):
runner = TaskRunner(bridge=bridge)
runner._running = True
runner.stop()
assert runner._running is False
async def test_start_disabled_when_interval_zero(
self,
mock_client,
bridge,
settings_patch,
):
settings_patch.paperclip_poll_interval = 0
runner = TaskRunner(bridge=bridge)
await runner.start()
mock_client.list_issues.assert_not_awaited()
# ═══════════════════════════════════════════════════════════════════════════════
# LIVE LLM (manual e2e): runs only when Ollama is available
# ═══════════════════════════════════════════════════════════════════════════════
def _ollama_reachable() -> tuple[bool, list[str]]:
"""Return (reachable, model_names)."""
try:
import httpx
resp = httpx.get("http://localhost:11434/api/tags", timeout=3)
resp.raise_for_status()
names = [m["name"] for m in resp.json().get("models", [])]
return True, names
except Exception:
return False, []
def _pick_tiny_model(available: list[str]) -> str | None:
"""Pick the smallest model available for e2e tests."""
candidates = ["tinyllama", "phi", "qwen2:0.5b", "llama3.2:1b", "gemma:2b"]
for candidate in candidates:
for name in available:
if candidate in name:
return name
return None
class LiveOllamaOrchestrator:
"""Thin orchestrator that calls Ollama directly — no Agno dependency."""
def __init__(self, model_name: str) -> None:
self.model_name = model_name
self.calls: list[dict] = []
async def execute_task(self, task_id: str, description: str, context: dict) -> str:
import httpx as hx
self.calls.append({"task_id": task_id, "description": description})
async with hx.AsyncClient(timeout=60) as client:
resp = await client.post(
"http://localhost:11434/api/generate",
json={
"model": self.model_name,
"prompt": (
f"You are Timmy, a task automation agent. "
f"Task: {description}\n"
f"Respond in 1-2 sentences about what you did."
),
"stream": False,
"options": {"num_predict": 64},
},
)
resp.raise_for_status()
return resp.json()["response"]
@pytest.mark.ollama
class TestLiveOllamaGreenPath:
"""Green-path with a real tiny LLM via Ollama.
Run with: ``tox -e ollama`` or ``pytest -m ollama``
Requires: Ollama running with a small model.
"""
async def test_live_full_cycle(self, mock_client, bridge, settings_patch):
"""Wire a real tiny LLM through the full pipe and verify output."""
reachable, models = _ollama_reachable()
if not reachable:
pytest.skip("Ollama not reachable at localhost:11434")
chosen = _pick_tiny_model(models)
if not chosen:
pytest.skip(f"No tiny model found (have: {models[:5]})")
issue = _make_issue(
id="live-1",
title="Reflect on task automation",
description="Muse about how you process tasks and suggest improvements.",
)
mock_client.list_issues.return_value = [issue]
mock_client.update_issue.return_value = _make_done("live-1")
mock_client.create_issue.return_value = _make_follow_up("live-fu")
live_orch = LiveOllamaOrchestrator(chosen)
runner = TaskRunner(bridge=bridge, orchestrator=live_orch)
summary = await runner.run_once()
# The LLM produced *something* non-empty
assert summary is not None
assert len(summary["result"]) > 0
assert summary["completed"] is True
assert summary["follow_up_issue_id"] == "live-fu"
# Orchestrator was actually called
assert len(live_orch.calls) == 1
assert live_orch.calls[0]["task_id"] == "live-1"
# LLM output flowed into the Paperclip comment
comment = mock_client.add_comment.call_args[0][1]
assert "[Timmy]" in comment
assert len(comment) > len("[Timmy] Task completed.\n\n")
# LLM output flowed into the follow-up description
fu_req = mock_client.create_issue.call_args[0][0]
assert len(fu_req.description) > 0
assert fu_req.assignee_id == TIMMY_AGENT_ID
async def test_live_recursive_chain(self, mock_client, bridge, settings_patch):
"""Two-cycle chain with a real LLM — each cycle produces real output."""
reachable, models = _ollama_reachable()
if not reachable:
pytest.skip("Ollama not reachable")
chosen = _pick_tiny_model(models)
if not chosen:
pytest.skip("No tiny model found")
task_a = _make_issue(id="live-A", title="Initial reflection")
fu_b = PaperclipIssue(
id="live-B",
title="Follow-up: Initial reflection",
description="Continue reflecting",
status="open",
assignee_id=TIMMY_AGENT_ID,
priority="normal",
)
fu_c = PaperclipIssue(
id="live-C",
title="Follow-up: Follow-up",
status="open",
assignee_id=TIMMY_AGENT_ID,
)
live_orch = LiveOllamaOrchestrator(chosen)
runner = TaskRunner(bridge=bridge, orchestrator=live_orch)
# Cycle 1
mock_client.list_issues.return_value = [task_a]
mock_client.update_issue.return_value = _make_done("live-A")
mock_client.create_issue.return_value = fu_b
s1 = await runner.run_once()
assert s1 is not None
assert len(s1["result"]) > 0
# Cycle 2
mock_client.list_issues.return_value = [fu_b]
mock_client.update_issue.return_value = _make_done("live-B")
mock_client.create_issue.return_value = fu_c
s2 = await runner.run_once()
assert s2 is not None
assert len(s2["result"]) > 0
# Both cycles went through the LLM
assert len(live_orch.calls) == 2

View File

@@ -1,234 +0,0 @@
"""Chunk 2: OpenFang HTTP client — test first, implement second.
Tests cover:
- Health check returns False when unreachable
- Health check TTL caching
- execute_hand() rejects unknown hands
- execute_hand() success with mocked HTTP
- execute_hand() graceful degradation on error
- Convenience wrappers call the correct hand
"""
import json
from unittest.mock import MagicMock, patch
import pytest
# ---------------------------------------------------------------------------
# Health checks
# ---------------------------------------------------------------------------
def test_health_check_false_when_unreachable():
"""Client should report unhealthy when OpenFang is not running."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
assert client._check_health() is False
def test_health_check_caching():
"""Repeated .healthy calls within TTL should not re-check."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
client._health_cache_ttl = 9999 # very long TTL
# Force a first check (will be False)
_ = client.healthy
assert client._healthy is False
# Manually flip the cached value — next access should use cache
client._healthy = True
assert client.healthy is True # still cached, no re-check
# ---------------------------------------------------------------------------
# execute_hand — unknown hand
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hand_unknown_hand():
"""Requesting an unknown hand returns success=False immediately."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
result = await client.execute_hand("nonexistent_hand", {})
assert result.success is False
assert "Unknown hand" in result.error
# ---------------------------------------------------------------------------
# execute_hand — success path (mocked HTTP)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hand_success_mocked():
"""When OpenFang returns 200 with output, HandResult.success is True."""
from infrastructure.openfang.client import OpenFangClient
response_body = json.dumps(
{
"success": True,
"output": "Page loaded successfully",
"metadata": {"url": "https://example.com"},
}
).encode()
mock_resp = MagicMock()
mock_resp.status = 200
mock_resp.read.return_value = response_body
mock_resp.__enter__ = lambda s: s
mock_resp.__exit__ = MagicMock(return_value=False)
with patch("urllib.request.urlopen", return_value=mock_resp):
client = OpenFangClient(base_url="http://localhost:8080")
result = await client.execute_hand("browser", {"url": "https://example.com"})
assert result.success is True
assert result.output == "Page loaded successfully"
assert result.hand == "browser"
assert result.latency_ms > 0
# ---------------------------------------------------------------------------
# execute_hand — graceful degradation on connection error
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_execute_hand_connection_error():
"""When OpenFang is unreachable, HandResult.success is False (no crash)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
result = await client.execute_hand("browser", {"url": "https://example.com"})
assert result.success is False
assert result.error # non-empty error message
assert result.hand == "browser"
# ---------------------------------------------------------------------------
# Convenience wrappers
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_browse_calls_browser_hand():
"""browse() should delegate to execute_hand('browser', ...)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
calls = []
original = client.execute_hand
async def spy(hand, params, **kw):
calls.append((hand, params))
return await original(hand, params, **kw)
client.execute_hand = spy
await client.browse("https://example.com", "click button")
assert len(calls) == 1
assert calls[0][0] == "browser"
assert calls[0][1]["url"] == "https://example.com"
@pytest.mark.asyncio
async def test_collect_calls_collector_hand():
"""collect() should delegate to execute_hand('collector', ...)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
calls = []
original = client.execute_hand
async def spy(hand, params, **kw):
calls.append((hand, params))
return await original(hand, params, **kw)
client.execute_hand = spy
await client.collect("example.com", depth="deep")
assert len(calls) == 1
assert calls[0][0] == "collector"
assert calls[0][1]["target"] == "example.com"
@pytest.mark.asyncio
async def test_predict_calls_predictor_hand():
"""predict() should delegate to execute_hand('predictor', ...)."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
calls = []
original = client.execute_hand
async def spy(hand, params, **kw):
calls.append((hand, params))
return await original(hand, params, **kw)
client.execute_hand = spy
await client.predict("Will BTC hit 100k?", horizon="1m")
assert len(calls) == 1
assert calls[0][0] == "predictor"
assert calls[0][1]["question"] == "Will BTC hit 100k?"
# ---------------------------------------------------------------------------
# HandResult dataclass
# ---------------------------------------------------------------------------
def test_hand_result_defaults():
"""HandResult should have sensible defaults."""
from infrastructure.openfang.client import HandResult
r = HandResult(hand="browser", success=True)
assert r.output == ""
assert r.error == ""
assert r.latency_ms == 0.0
assert r.metadata == {}
# ---------------------------------------------------------------------------
# OPENFANG_HANDS constant
# ---------------------------------------------------------------------------
def test_openfang_hands_tuple():
"""The OPENFANG_HANDS constant should list all 7 hands."""
from infrastructure.openfang.client import OPENFANG_HANDS
assert len(OPENFANG_HANDS) == 7
assert "browser" in OPENFANG_HANDS
assert "collector" in OPENFANG_HANDS
assert "predictor" in OPENFANG_HANDS
assert "lead" in OPENFANG_HANDS
assert "twitter" in OPENFANG_HANDS
assert "researcher" in OPENFANG_HANDS
assert "clip" in OPENFANG_HANDS
# ---------------------------------------------------------------------------
# status() summary
# ---------------------------------------------------------------------------
def test_status_returns_summary():
"""status() should return a dict with url, healthy flag, and hands list."""
from infrastructure.openfang.client import OpenFangClient
client = OpenFangClient(base_url="http://localhost:19999")
s = client.status()
assert "url" in s
assert "healthy" in s
assert "available_hands" in s
assert len(s["available_hands"]) == 7

View File

@@ -1,25 +0,0 @@
"""Chunk 1: OpenFang config settings — test first, implement second."""
def test_openfang_url_default():
"""Settings should expose openfang_url with a sensible default."""
from config import settings
assert hasattr(settings, "openfang_url")
assert settings.openfang_url == "http://localhost:8080"
def test_openfang_enabled_default_false():
"""OpenFang integration should be opt-in (disabled by default)."""
from config import settings
assert hasattr(settings, "openfang_enabled")
assert settings.openfang_enabled is False
def test_openfang_timeout_default():
"""Timeout should be generous (some hands are slow)."""
from config import settings
assert hasattr(settings, "openfang_timeout")
assert settings.openfang_timeout == 120

View File

@@ -1,43 +0,0 @@
"""Paperclip AI config settings."""
def test_paperclip_url_default():
from config import settings
assert hasattr(settings, "paperclip_url")
assert settings.paperclip_url == "http://localhost:3100"
def test_paperclip_enabled_default_false():
from config import settings
assert hasattr(settings, "paperclip_enabled")
assert settings.paperclip_enabled is False
def test_paperclip_timeout_default():
from config import settings
assert hasattr(settings, "paperclip_timeout")
assert settings.paperclip_timeout == 30
def test_paperclip_agent_id_default_empty():
from config import settings
assert hasattr(settings, "paperclip_agent_id")
assert settings.paperclip_agent_id == ""
def test_paperclip_company_id_default_empty():
from config import settings
assert hasattr(settings, "paperclip_company_id")
assert settings.paperclip_company_id == ""
def test_paperclip_poll_interval_default_zero():
from config import settings
assert hasattr(settings, "paperclip_poll_interval")
assert settings.paperclip_poll_interval == 0

View File

@@ -103,16 +103,8 @@ class TestFeaturePages:
r = client.get("/models")
assert r.status_code == 200
def test_swarm_live(self, client):
r = client.get("/swarm/live")
assert r.status_code == 200
def test_swarm_events(self, client):
r = client.get("/swarm/events")
assert r.status_code == 200
def test_marketplace(self, client):
r = client.get("/marketplace")
def test_memory_page(self, client):
r = client.get("/memory")
assert r.status_code in (200, 307)
@@ -162,10 +154,6 @@ class TestAPIEndpoints:
r = client.get("/api/notifications")
assert r.status_code == 200
def test_providers_api(self, client):
r = client.get("/router/api/providers")
assert r.status_code == 200
def test_mobile_status(self, client):
r = client.get("/mobile/status")
assert r.status_code == 200
@@ -182,10 +170,6 @@ class TestAPIEndpoints:
r = client.get("/grok/status")
assert r.status_code == 200
def test_paperclip_status(self, client):
r = client.get("/api/paperclip/status")
assert r.status_code == 200
# ---------------------------------------------------------------------------
# No 500s — every GET route should survive without server error
@@ -223,19 +207,14 @@ class TestNo500:
"/mobile/status",
"/spark",
"/models",
"/swarm/live",
"/swarm/events",
"/marketplace",
"/api/queue/status",
"/api/tasks",
"/api/chat/history",
"/api/notifications",
"/router/api/providers",
"/discord/status",
"/telegram/status",
"/grok/status",
"/grok/stats",
"/api/paperclip/status",
],
)
def test_no_500(self, client, path):

View File

@@ -1,265 +0,0 @@
"""Tests for timmy.agents.timmy — orchestrator, personas, context building."""
import sys
from unittest.mock import MagicMock, patch
# Ensure mcp.registry stub with tool_registry exists before importing agents
if "mcp" not in sys.modules:
_mock_mcp = MagicMock()
_mock_registry_mod = MagicMock()
_mock_tool_reg = MagicMock()
_mock_tool_reg.get_handler.return_value = None
_mock_registry_mod.tool_registry = _mock_tool_reg
sys.modules["mcp"] = _mock_mcp
sys.modules["mcp.registry"] = _mock_registry_mod
from timmy.agents.timmy import (
_PERSONAS,
ORCHESTRATOR_PROMPT_BASE,
TimmyOrchestrator,
_load_hands_async,
build_timmy_context_async,
build_timmy_context_sync,
create_timmy_swarm,
format_timmy_prompt,
)
class TestLoadHandsAsync:
"""Test _load_hands_async."""
async def test_returns_empty_list(self):
result = await _load_hands_async()
assert result == []
class TestBuildContext:
"""Test context building functions."""
@patch("timmy.agents.timmy.settings")
def test_build_context_sync_graceful_failures(self, mock_settings):
mock_settings.repo_root = "/nonexistent"
ctx = build_timmy_context_sync()
assert "timestamp" in ctx
assert isinstance(ctx["agents"], list)
assert isinstance(ctx["hands"], list)
# Git log should fall back gracefully
assert isinstance(ctx["git_log"], str)
# Memory should fall back gracefully
assert isinstance(ctx["memory"], str)
@patch("timmy.agents.timmy.settings")
async def test_build_context_async(self, mock_settings):
mock_settings.repo_root = "/nonexistent"
ctx = await build_timmy_context_async()
assert ctx["hands"] == []
@patch("timmy.agents.timmy.settings")
def test_build_context_reads_memory_file(self, mock_settings, tmp_path):
memory_file = tmp_path / "MEMORY.md"
memory_file.write_text("# Important memories\nRemember this.")
mock_settings.repo_root = str(tmp_path)
# Patch HotMemory path so it reads from tmp_path
from timmy.memory_system import memory_system
original_path = memory_system.hot.path
memory_system.hot.path = memory_file
memory_system.hot._content = None # Clear cache
try:
ctx = build_timmy_context_sync()
assert "Important memories" in ctx["memory"]
finally:
memory_system.hot.path = original_path
memory_system.hot._content = None
class TestFormatPrompt:
"""Test format_timmy_prompt."""
def test_inserts_context_block(self):
base = "Line one.\nLine two."
ctx = {
"timestamp": "2026-03-06T00:00:00Z",
"repo_root": "/home/user/project",
"git_log": "abc123 initial commit",
"agents": [],
"hands": [],
"memory": "some memory",
}
result = format_timmy_prompt(base, ctx)
assert "Line one." in result
assert "Line two." in result
assert "abc123 initial commit" in result
assert "some memory" in result
def test_agents_list_formatted(self):
ctx = {
"timestamp": "now",
"repo_root": "/tmp",
"git_log": "",
"agents": [
{"name": "Forge", "capabilities": "code", "status": "ready"},
{"name": "Seer", "capabilities": "research", "status": "ready"},
],
"hands": [],
"memory": "",
}
result = format_timmy_prompt("Base.", ctx)
assert "Forge" in result
assert "Seer" in result
def test_hands_list_formatted(self):
ctx = {
"timestamp": "now",
"repo_root": "/tmp",
"git_log": "",
"agents": [],
"hands": [
{"name": "backup", "schedule": "daily", "enabled": True},
],
"memory": "",
}
result = format_timmy_prompt("Base.", ctx)
assert "backup" in result
assert "enabled" in result
def test_repo_root_placeholder_replaced(self):
ctx = {
"timestamp": "now",
"repo_root": "/my/repo",
"git_log": "",
"agents": [],
"hands": [],
"memory": "",
}
result = format_timmy_prompt("Root is {REPO_ROOT}.", ctx)
assert "/my/repo" in result
assert "{REPO_ROOT}" not in result
class TestExtractAgent:
"""Test TimmyOrchestrator._extract_agent static method."""
def test_extracts_known_agents(self):
assert TimmyOrchestrator._extract_agent("Primary Agent: Seer") == "seer"
assert TimmyOrchestrator._extract_agent("Use Forge for this") == "forge"
assert TimmyOrchestrator._extract_agent("Route to quill") == "quill"
assert TimmyOrchestrator._extract_agent("echo can recall") == "echo"
assert TimmyOrchestrator._extract_agent("helm decides") == "helm"
def test_defaults_to_orchestrator(self):
assert TimmyOrchestrator._extract_agent("no agent mentioned") == "orchestrator"
def test_case_insensitive(self):
assert TimmyOrchestrator._extract_agent("Use FORGE") == "forge"
class TestTimmyOrchestrator:
"""Test TimmyOrchestrator init and methods."""
@patch("timmy.agents.timmy.settings")
def test_init(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
assert orch.agent_id == "orchestrator"
assert orch.name == "Orchestrator"
assert orch.sub_agents == {}
assert orch._session_initialized is False
@patch("timmy.agents.timmy.settings")
def test_register_sub_agent(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
from timmy.agents.base import SubAgent
agent = SubAgent(
agent_id="test-agent",
name="Test",
role="test",
system_prompt="You are a test agent.",
)
orch.register_sub_agent(agent)
assert "test-agent" in orch.sub_agents
@patch("timmy.agents.timmy.settings")
def test_get_swarm_status(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
status = orch.get_swarm_status()
assert "orchestrator" in status
assert status["total_agents"] == 1
@patch("timmy.agents.timmy.settings")
def test_get_enhanced_system_prompt_with_attr(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
orch = TimmyOrchestrator()
# BaseAgent doesn't store system_prompt as attr; set it manually
orch.system_prompt = "Test prompt.\nWith context."
prompt = orch._get_enhanced_system_prompt()
assert isinstance(prompt, str)
assert "Test prompt." in prompt
class TestCreateTimmySwarm:
"""Test create_timmy_swarm factory."""
@patch("timmy.agents.timmy.settings")
def test_creates_all_personas(self, mock_settings):
mock_settings.repo_root = "/tmp"
mock_settings.ollama_model = "test"
mock_settings.ollama_url = "http://localhost:11434"
mock_settings.telemetry_enabled = False
swarm = create_timmy_swarm()
assert len(swarm.sub_agents) == len(_PERSONAS)
assert "seer" in swarm.sub_agents
assert "forge" in swarm.sub_agents
assert "quill" in swarm.sub_agents
assert "echo" in swarm.sub_agents
assert "helm" in swarm.sub_agents
class TestPersonas:
"""Test persona definitions."""
def test_all_personas_have_required_fields(self):
required = {"agent_id", "name", "role", "system_prompt"}
for persona in _PERSONAS:
assert required.issubset(persona.keys()), f"Missing fields in {persona['name']}"
def test_persona_ids_unique(self):
ids = [p["agent_id"] for p in _PERSONAS]
assert len(ids) == len(set(ids))
def test_six_personas(self):
assert len(_PERSONAS) == 6
class TestOrchestratorPrompt:
"""Test the ORCHESTRATOR_PROMPT_BASE constant."""
def test_contains_hard_rules(self):
assert "NEVER fabricate" in ORCHESTRATOR_PROMPT_BASE
assert "do not know" in ORCHESTRATOR_PROMPT_BASE.lower()
def test_contains_repo_root_placeholder(self):
assert "{REPO_ROOT}" in ORCHESTRATOR_PROMPT_BASE

View File

@@ -6,12 +6,42 @@ import pytest
from timmy.mcp_tools import (
_bridge_to_work_order,
_parse_command,
close_mcp_sessions,
create_filesystem_mcp_tools,
create_gitea_issue_via_mcp,
create_gitea_mcp_tools,
)
# ---------------------------------------------------------------------------
# _parse_command
# ---------------------------------------------------------------------------
def test_parse_command_splits_correctly():
"""_parse_command splits a command string into executable and args."""
with patch("timmy.mcp_tools.shutil.which", return_value="/usr/local/bin/gitea-mcp"):
exe, args = _parse_command("gitea-mcp -t stdio")
assert exe == "/usr/local/bin/gitea-mcp"
assert args == ["-t", "stdio"]
def test_parse_command_expands_tilde():
"""_parse_command expands ~/."""
with patch("timmy.mcp_tools.shutil.which", return_value=None):
exe, args = _parse_command("~/go/bin/gitea-mcp -t stdio")
assert "/go/bin/gitea-mcp" in exe
assert "~" not in exe
assert args == ["-t", "stdio"]
def test_parse_command_preserves_absolute_path():
"""_parse_command preserves an absolute path without calling which."""
exe, args = _parse_command("/usr/local/bin/gitea-mcp -t stdio")
assert exe == "/usr/local/bin/gitea-mcp"
assert args == ["-t", "stdio"]
# ---------------------------------------------------------------------------
# create_gitea_mcp_tools
# ---------------------------------------------------------------------------
@@ -36,25 +66,26 @@ def test_gitea_mcp_returns_none_when_no_token():
def test_gitea_mcp_returns_tools_when_configured():
"""Gitea MCP factory returns an MCPTools instance when properly configured."""
"""Gitea MCP factory returns an MCPTools instance using server_params."""
mock_mcp = MagicMock()
mock_params = MagicMock()
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_mcp) as mock_cls,
patch("timmy.mcp_tools._gitea_server_params", return_value=mock_params),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
mock_settings.mcp_gitea_command = "gitea-mcp -t stdio"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.mcp_timeout = 15
result = create_gitea_mcp_tools()
assert result is mock_mcp
mock_cls.assert_called_once()
call_kwargs = mock_cls.call_args[1]
assert call_kwargs["command"] == "gitea-mcp -t stdio"
assert call_kwargs["env"]["GITEA_ACCESS_TOKEN"] == "tok123"
assert "create_issue" in call_kwargs["include_tools"]
assert call_kwargs["server_params"] is mock_params
assert "command" not in call_kwargs
assert "issue_write" in call_kwargs["include_tools"]
assert "pull_request_write" in call_kwargs["include_tools"]
def test_gitea_mcp_graceful_on_import_error():
@@ -76,11 +107,14 @@ def test_gitea_mcp_graceful_on_import_error():
def test_filesystem_mcp_returns_tools():
"""Filesystem MCP factory returns an MCPTools instance."""
"""Filesystem MCP factory returns an MCPTools instance using server_params."""
mock_mcp = MagicMock()
mock_params_cls = MagicMock()
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_mcp) as mock_cls,
patch("mcp.client.stdio.StdioServerParameters", mock_params_cls),
patch("timmy.mcp_tools.shutil.which", return_value="/usr/local/bin/npx"),
):
mock_settings.mcp_filesystem_command = "npx -y @modelcontextprotocol/server-filesystem"
mock_settings.repo_root = "/home/user/project"
@@ -89,8 +123,11 @@ def test_filesystem_mcp_returns_tools():
assert result is mock_mcp
call_kwargs = mock_cls.call_args[1]
assert "/home/user/project" in call_kwargs["command"]
assert "server_params" in call_kwargs
assert "read_file" in call_kwargs["include_tools"]
# Verify StdioServerParameters was called with repo_root as an arg
params_kwargs = mock_params_cls.call_args[1]
assert "/home/user/project" in params_kwargs["args"]
# ---------------------------------------------------------------------------
@@ -110,23 +147,29 @@ async def test_issue_via_mcp_returns_not_configured():
@pytest.mark.asyncio
async def test_issue_via_mcp_calls_tool():
"""Issue creation calls the MCP tool with correct arguments."""
"""Issue creation calls session.call_tool with correct arguments."""
import timmy.mcp_tools as mcp_mod
mock_session = MagicMock()
mock_session.connect = AsyncMock()
mock_session.call_tool = AsyncMock(return_value="Issue #42 created")
mock_session._connected = False
# Mock the inner MCP session (tools.session.call_tool)
mock_inner_session = MagicMock()
mock_inner_session.call_tool = AsyncMock(return_value="Issue #42 created")
mock_tools = MagicMock()
mock_tools.connect = AsyncMock()
mock_tools.session = mock_inner_session
mock_tools._connected = False
mock_params = MagicMock()
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_session),
patch("agno.tools.mcp.MCPTools", return_value=mock_tools),
patch("timmy.mcp_tools._gitea_server_params", return_value=mock_params),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.mcp_gitea_command = "gitea-mcp -t stdio"
mock_settings.mcp_timeout = 15
mock_settings.repo_root = "/tmp/test"
@@ -136,10 +179,12 @@ async def test_issue_via_mcp_calls_tool():
result = await create_gitea_issue_via_mcp("Bug title", "Bug body", "bug")
assert "Bug title" in result
mock_session.connect.assert_awaited_once()
mock_session.call_tool.assert_awaited_once()
call_args = mock_session.call_tool.call_args
assert call_args[0][0] == "create_issue"
mock_tools.connect.assert_awaited_once()
# Verify it calls session.call_tool (not tools.call_tool)
mock_inner_session.call_tool.assert_awaited_once()
call_args = mock_inner_session.call_tool.call_args
assert call_args[0][0] == "issue_write"
assert call_args[1]["arguments"]["method"] == "create"
assert call_args[1]["arguments"]["owner"] == "owner"
assert call_args[1]["arguments"]["repo"] == "repo"
@@ -152,19 +197,21 @@ async def test_issue_via_mcp_graceful_failure():
"""Issue creation returns error string on MCP failure."""
import timmy.mcp_tools as mcp_mod
mock_session = MagicMock()
mock_session.connect = AsyncMock(side_effect=ConnectionError("no process"))
mock_session._connected = False
mock_tools = MagicMock()
mock_tools.connect = AsyncMock(side_effect=ConnectionError("no process"))
mock_tools._connected = False
mock_params = MagicMock()
with (
patch("timmy.mcp_tools.settings") as mock_settings,
patch("agno.tools.mcp.MCPTools", return_value=mock_session),
patch("agno.tools.mcp.MCPTools", return_value=mock_tools),
patch("timmy.mcp_tools._gitea_server_params", return_value=mock_params),
):
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok123"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://localhost:3000"
mock_settings.mcp_gitea_command = "gitea-mcp -t stdio"
mock_settings.mcp_timeout = 15
mock_settings.repo_root = "/tmp/test"
@@ -207,16 +254,16 @@ def test_bridge_to_work_order(tmp_path):
@pytest.mark.asyncio
async def test_close_mcp_sessions():
"""close_mcp_sessions disconnects the cached session."""
"""close_mcp_sessions closes the cached session."""
import timmy.mcp_tools as mcp_mod
mock_session = MagicMock()
mock_session.disconnect = AsyncMock()
mock_session.close = AsyncMock()
mcp_mod._issue_session = mock_session
await close_mcp_sessions()
mock_session.disconnect.assert_awaited_once()
mock_session.close.assert_awaited_once()
assert mcp_mod._issue_session is None
@@ -240,8 +287,9 @@ def test_mcp_tools_classified_in_safety():
from timmy.tool_safety import DANGEROUS_TOOLS, SAFE_TOOLS, requires_confirmation
# Gitea MCP tools should be safe
assert "create_issue" in SAFE_TOOLS
assert "list_repo_issues" in SAFE_TOOLS
assert "issue_write" in SAFE_TOOLS
assert "list_issues" in SAFE_TOOLS
assert "pull_request_write" in SAFE_TOOLS
# Filesystem read-only MCP tools should be safe
assert "list_directory" in SAFE_TOOLS
@@ -251,6 +299,6 @@ def test_mcp_tools_classified_in_safety():
assert "write_file" in DANGEROUS_TOOLS
# Verify requires_confirmation logic
assert not requires_confirmation("create_issue")
assert not requires_confirmation("issue_write")
assert not requires_confirmation("list_directory")
assert requires_confirmation("write_file")

View File

@@ -1,4 +1,9 @@
"""Tests for timmy.tools_delegation — delegate_task and list_swarm_agents."""
"""Tests for timmy.tools_delegation — delegate_task and list_swarm_agents.
Agent IDs are now defined in config/agents.yaml, not hardcoded Python.
Tests reference the YAML-defined IDs: orchestrator, researcher, coder,
writer, memory, experimenter.
"""
from timmy.tools_delegation import delegate_task, list_swarm_agents
@@ -11,33 +16,37 @@ class TestDelegateTask:
assert result["task_id"] is None
def test_valid_agent_names_normalised(self):
# Should still fail at import (no swarm module), but agent name is accepted
result = delegate_task(" Seer ", "think about it")
# The swarm import will fail, so success=False but error is about import, not agent name
# Agent IDs are lowercased; whitespace should be stripped
result = delegate_task(" Researcher ", "think about it")
assert "Unknown agent" not in result.get("error", "")
def test_invalid_priority_defaults_to_normal(self):
# Even with bad priority, delegate_task should not crash
result = delegate_task("forge", "build", priority="ultra")
result = delegate_task("coder", "build", priority="ultra")
assert isinstance(result, dict)
def test_all_valid_agents_accepted(self):
valid_agents = ["seer", "forge", "echo", "helm", "quill"]
# These IDs match config/agents.yaml
valid_agents = ["orchestrator", "researcher", "coder", "writer", "memory", "experimenter"]
for agent in valid_agents:
result = delegate_task(agent, "test task")
assert "Unknown agent" not in result.get("error", ""), f"{agent} rejected"
def test_mace_no_longer_valid(self):
result = delegate_task("mace", "run security scan")
assert result["success"] is False
assert "Unknown agent" in result["error"]
def test_old_agent_names_no_longer_valid(self):
# Old hardcoded names should not work anymore
for old_name in ["seer", "forge", "echo", "helm", "quill", "mace"]:
result = delegate_task(old_name, "test")
assert result["success"] is False
assert "Unknown agent" in result["error"]
class TestListSwarmAgents:
def test_returns_agents_from_personas(self):
def test_returns_agents_from_yaml(self):
result = list_swarm_agents()
assert result["success"] is True
assert len(result["agents"]) > 0
agent_names = [a["name"] for a in result["agents"]]
# These names come from config/agents.yaml
assert "Seer" in agent_names
assert "Forge" in agent_names
assert "Timmy" in agent_names