Automated salvage commit — agent session ended (exit 124). Work in progress, may need continuation.
501 lines
18 KiB
Python
501 lines
18 KiB
Python
"""Tests for the agent dispatcher (timmy.dispatcher)."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
from timmy.dispatcher import (
|
|
AGENT_REGISTRY,
|
|
AgentType,
|
|
DispatchResult,
|
|
DispatchStatus,
|
|
TaskType,
|
|
_dispatch_local,
|
|
_dispatch_via_api,
|
|
_dispatch_via_gitea,
|
|
dispatch_task,
|
|
infer_task_type,
|
|
select_agent,
|
|
wait_for_completion,
|
|
)
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# Agent registry
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestAgentRegistry:
|
|
def test_all_agents_present(self):
|
|
for member in AgentType:
|
|
assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry"
|
|
|
|
def test_agent_specs_have_display_names(self):
|
|
for agent, spec in AGENT_REGISTRY.items():
|
|
assert spec.display_name, f"{agent} has empty display_name"
|
|
|
|
def test_gitea_agents_have_labels(self):
|
|
for agent, spec in AGENT_REGISTRY.items():
|
|
if spec.interface == "gitea":
|
|
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
|
|
|
|
def test_non_gitea_agents_have_no_labels(self):
|
|
for agent, spec in AGENT_REGISTRY.items():
|
|
if spec.interface not in ("gitea",):
|
|
# api and local agents may have no label
|
|
assert spec.gitea_label is None or spec.interface == "gitea"
|
|
|
|
def test_max_concurrent_positive(self):
|
|
for agent, spec in AGENT_REGISTRY.items():
|
|
assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1"
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# select_agent
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestSelectAgent:
|
|
def test_architecture_routes_to_claude(self):
|
|
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
|
|
|
|
def test_refactoring_routes_to_claude(self):
|
|
assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE
|
|
|
|
def test_code_review_routes_to_claude(self):
|
|
assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE
|
|
|
|
def test_routine_coding_routes_to_kimi(self):
|
|
assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE
|
|
|
|
def test_fast_iteration_routes_to_kimi(self):
|
|
assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE
|
|
|
|
def test_research_routes_to_agent_api(self):
|
|
assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API
|
|
|
|
def test_triage_routes_to_timmy(self):
|
|
assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY
|
|
|
|
def test_planning_routes_to_timmy(self):
|
|
assert select_agent(TaskType.PLANNING) == AgentType.TIMMY
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# infer_task_type
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestInferTaskType:
|
|
def test_architecture_keyword(self):
|
|
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
|
|
|
|
def test_refactor_keyword(self):
|
|
assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING
|
|
|
|
def test_code_review_keyword(self):
|
|
assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW
|
|
|
|
def test_research_keyword(self):
|
|
assert infer_task_type("Research embedding models") == TaskType.RESEARCH
|
|
|
|
def test_triage_keyword(self):
|
|
assert infer_task_type("Triage open issues") == TaskType.TRIAGE
|
|
|
|
def test_planning_keyword(self):
|
|
assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING
|
|
|
|
def test_fallback_returns_routine_coding(self):
|
|
assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING
|
|
|
|
def test_description_contributes_to_inference(self):
|
|
result = infer_task_type("Implement feature", "We need to refactor the old code")
|
|
assert result == TaskType.REFACTORING
|
|
|
|
def test_case_insensitive(self):
|
|
assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# DispatchResult
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDispatchResult:
|
|
def test_success_when_assigned(self):
|
|
r = DispatchResult(
|
|
task_type=TaskType.ROUTINE_CODING,
|
|
agent=AgentType.KIMI_CODE,
|
|
issue_number=1,
|
|
status=DispatchStatus.ASSIGNED,
|
|
)
|
|
assert r.success is True
|
|
|
|
def test_success_when_completed(self):
|
|
r = DispatchResult(
|
|
task_type=TaskType.ROUTINE_CODING,
|
|
agent=AgentType.KIMI_CODE,
|
|
issue_number=1,
|
|
status=DispatchStatus.COMPLETED,
|
|
)
|
|
assert r.success is True
|
|
|
|
def test_not_success_when_failed(self):
|
|
r = DispatchResult(
|
|
task_type=TaskType.ROUTINE_CODING,
|
|
agent=AgentType.KIMI_CODE,
|
|
issue_number=1,
|
|
status=DispatchStatus.FAILED,
|
|
)
|
|
assert r.success is False
|
|
|
|
def test_not_success_when_escalated(self):
|
|
r = DispatchResult(
|
|
task_type=TaskType.ROUTINE_CODING,
|
|
agent=AgentType.KIMI_CODE,
|
|
issue_number=1,
|
|
status=DispatchStatus.ESCALATED,
|
|
)
|
|
assert r.success is False
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _dispatch_local
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDispatchLocal:
|
|
async def test_returns_assigned(self):
|
|
result = await _dispatch_local(
|
|
title="Plan the migration",
|
|
description="We need a plan.",
|
|
acceptance_criteria=["Plan is documented"],
|
|
issue_number=42,
|
|
)
|
|
assert result.status == DispatchStatus.ASSIGNED
|
|
assert result.agent == AgentType.TIMMY
|
|
assert result.issue_number == 42
|
|
|
|
async def test_infers_task_type(self):
|
|
result = await _dispatch_local(
|
|
title="Plan the sprint",
|
|
description="",
|
|
acceptance_criteria=[],
|
|
)
|
|
assert result.task_type == TaskType.PLANNING
|
|
|
|
async def test_no_issue_number(self):
|
|
result = await _dispatch_local(title="Do something", description="")
|
|
assert result.issue_number is None
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _dispatch_via_api
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDispatchViaApi:
|
|
async def test_no_endpoint_returns_failed(self):
|
|
result = await _dispatch_via_api(
|
|
agent=AgentType.AGENT_API,
|
|
title="Analyse logs",
|
|
description="",
|
|
acceptance_criteria=[],
|
|
)
|
|
assert result.status == DispatchStatus.FAILED
|
|
assert "No API endpoint" in (result.error or "")
|
|
|
|
async def test_successful_api_call(self):
|
|
mock_resp = MagicMock()
|
|
mock_resp.status_code = 202
|
|
mock_resp.content = b'{"ok": true}'
|
|
mock_resp.json.return_value = {"ok": True}
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
mock_client.post = AsyncMock(return_value=mock_resp)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_client):
|
|
result = await _dispatch_via_api(
|
|
agent=AgentType.AGENT_API,
|
|
title="Analyse logs",
|
|
description="Look at the logs",
|
|
acceptance_criteria=["Report produced"],
|
|
endpoint="http://fake-agent/dispatch",
|
|
)
|
|
|
|
assert result.status == DispatchStatus.ASSIGNED
|
|
assert result.agent == AgentType.AGENT_API
|
|
|
|
async def test_api_error_returns_failed(self):
|
|
mock_resp = MagicMock()
|
|
mock_resp.status_code = 500
|
|
mock_resp.text = "Internal Server Error"
|
|
|
|
mock_client = AsyncMock()
|
|
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
|
mock_client.__aexit__ = AsyncMock(return_value=False)
|
|
mock_client.post = AsyncMock(return_value=mock_resp)
|
|
|
|
with patch("httpx.AsyncClient", return_value=mock_client):
|
|
result = await _dispatch_via_api(
|
|
agent=AgentType.AGENT_API,
|
|
title="Analyse logs",
|
|
description="",
|
|
acceptance_criteria=[],
|
|
endpoint="http://fake-agent/dispatch",
|
|
)
|
|
|
|
assert result.status == DispatchStatus.FAILED
|
|
assert "500" in (result.error or "")
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# _dispatch_via_gitea
|
|
# ---------------------------------------------------------------------------
|
|
|
|
_GITEA_SETTINGS = MagicMock(
|
|
gitea_enabled=True,
|
|
gitea_token="test-token",
|
|
gitea_url="http://gitea.test",
|
|
gitea_repo="owner/repo",
|
|
)
|
|
|
|
|
|
class TestDispatchViaGitea:
|
|
def _make_client(self, label_list=None, label_create_status=201, comment_status=201):
|
|
"""Build a mock httpx.AsyncClient for Gitea interactions."""
|
|
label_resp = MagicMock()
|
|
label_resp.status_code = 200
|
|
label_resp.json.return_value = label_list or []
|
|
|
|
create_label_resp = MagicMock()
|
|
create_label_resp.status_code = label_create_status
|
|
create_label_resp.json.return_value = {"id": 99}
|
|
|
|
apply_label_resp = MagicMock()
|
|
apply_label_resp.status_code = 201
|
|
|
|
comment_resp = MagicMock()
|
|
comment_resp.status_code = comment_status
|
|
comment_resp.json.return_value = {"id": 7}
|
|
|
|
client = AsyncMock()
|
|
client.__aenter__ = AsyncMock(return_value=client)
|
|
client.__aexit__ = AsyncMock(return_value=False)
|
|
client.get = AsyncMock(return_value=label_resp)
|
|
client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp])
|
|
return client
|
|
|
|
async def test_successful_gitea_dispatch(self):
|
|
client = self._make_client()
|
|
with (
|
|
patch("httpx.AsyncClient", return_value=client),
|
|
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
|
):
|
|
result = await _dispatch_via_gitea(
|
|
agent=AgentType.CLAUDE_CODE,
|
|
issue_number=1072,
|
|
title="Design the router",
|
|
description="We need a cascade router.",
|
|
acceptance_criteria=["Failover works"],
|
|
)
|
|
|
|
assert result.success
|
|
assert result.agent == AgentType.CLAUDE_CODE
|
|
assert result.issue_number == 1072
|
|
assert result.status == DispatchStatus.ASSIGNED
|
|
|
|
async def test_no_gitea_token_returns_failed(self):
|
|
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
|
|
with patch("timmy.dispatcher.settings", bad_settings):
|
|
result = await _dispatch_via_gitea(
|
|
agent=AgentType.CLAUDE_CODE,
|
|
issue_number=1,
|
|
title="Some task",
|
|
description="",
|
|
acceptance_criteria=[],
|
|
)
|
|
assert result.status == DispatchStatus.FAILED
|
|
assert "not configured" in (result.error or "").lower()
|
|
|
|
async def test_gitea_disabled_returns_failed(self):
|
|
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
|
|
with patch("timmy.dispatcher.settings", bad_settings):
|
|
result = await _dispatch_via_gitea(
|
|
agent=AgentType.CLAUDE_CODE,
|
|
issue_number=1,
|
|
title="Some task",
|
|
description="",
|
|
acceptance_criteria=[],
|
|
)
|
|
assert result.status == DispatchStatus.FAILED
|
|
|
|
async def test_existing_label_reused(self):
|
|
"""When the label already exists, it should be reused (no creation call)."""
|
|
label_resp = MagicMock()
|
|
label_resp.status_code = 200
|
|
label_resp.json.return_value = [{"name": "claude-ready", "id": 55}]
|
|
|
|
apply_resp = MagicMock()
|
|
apply_resp.status_code = 201
|
|
|
|
comment_resp = MagicMock()
|
|
comment_resp.status_code = 201
|
|
comment_resp.json.return_value = {"id": 8}
|
|
|
|
client = AsyncMock()
|
|
client.__aenter__ = AsyncMock(return_value=client)
|
|
client.__aexit__ = AsyncMock(return_value=False)
|
|
client.get = AsyncMock(return_value=label_resp)
|
|
client.post = AsyncMock(side_effect=[apply_resp, comment_resp])
|
|
|
|
with (
|
|
patch("httpx.AsyncClient", return_value=client),
|
|
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
|
):
|
|
result = await _dispatch_via_gitea(
|
|
agent=AgentType.CLAUDE_CODE,
|
|
issue_number=10,
|
|
title="Architecture task",
|
|
description="",
|
|
acceptance_criteria=[],
|
|
)
|
|
|
|
assert result.success
|
|
# Should only have 2 POST calls: apply label + comment (no label creation)
|
|
assert client.post.call_count == 2
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# dispatch_task (integration-style)
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestDispatchTask:
|
|
async def test_empty_title_returns_failed(self):
|
|
result = await dispatch_task(title=" ")
|
|
assert result.status == DispatchStatus.FAILED
|
|
assert "`title` is required" in (result.error or "")
|
|
|
|
async def test_local_dispatch_for_timmy_task(self):
|
|
result = await dispatch_task(
|
|
title="Triage the open issues",
|
|
description="We have 40 open issues.",
|
|
acceptance_criteria=["Issues are labelled"],
|
|
task_type=TaskType.TRIAGE,
|
|
)
|
|
assert result.agent == AgentType.TIMMY
|
|
assert result.success
|
|
|
|
async def test_explicit_agent_override(self):
|
|
"""Caller can force a specific agent regardless of task type."""
|
|
result = await dispatch_task(
|
|
title="Triage the open issues",
|
|
agent=AgentType.TIMMY,
|
|
)
|
|
assert result.agent == AgentType.TIMMY
|
|
|
|
async def test_gitea_dispatch_when_issue_provided(self):
|
|
client_mock = AsyncMock()
|
|
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
|
client_mock.__aexit__ = AsyncMock(return_value=False)
|
|
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
|
|
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
|
|
apply_resp = MagicMock(status_code=201)
|
|
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
|
|
client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp])
|
|
|
|
with (
|
|
patch("httpx.AsyncClient", return_value=client_mock),
|
|
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
|
):
|
|
result = await dispatch_task(
|
|
title="Design the cascade router",
|
|
description="Architecture task.",
|
|
task_type=TaskType.ARCHITECTURE,
|
|
issue_number=1072,
|
|
)
|
|
|
|
assert result.agent == AgentType.CLAUDE_CODE
|
|
assert result.success
|
|
|
|
async def test_escalation_after_max_retries(self):
|
|
"""If all attempts fail, the result is ESCALATED."""
|
|
with (
|
|
patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch,
|
|
patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock),
|
|
):
|
|
mock_dispatch.return_value = DispatchResult(
|
|
task_type=TaskType.ARCHITECTURE,
|
|
agent=AgentType.CLAUDE_CODE,
|
|
issue_number=1,
|
|
status=DispatchStatus.FAILED,
|
|
error="Gitea offline",
|
|
)
|
|
result = await dispatch_task(
|
|
title="Design router",
|
|
task_type=TaskType.ARCHITECTURE,
|
|
issue_number=1,
|
|
max_retries=1,
|
|
)
|
|
|
|
assert result.status == DispatchStatus.ESCALATED
|
|
assert mock_dispatch.call_count == 2 # initial + 1 retry
|
|
|
|
async def test_no_retry_on_success(self):
|
|
with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch:
|
|
mock_dispatch.return_value = DispatchResult(
|
|
task_type=TaskType.ARCHITECTURE,
|
|
agent=AgentType.CLAUDE_CODE,
|
|
issue_number=1,
|
|
status=DispatchStatus.ASSIGNED,
|
|
comment_id=42,
|
|
label_applied="claude-ready",
|
|
)
|
|
result = await dispatch_task(
|
|
title="Design router",
|
|
task_type=TaskType.ARCHITECTURE,
|
|
issue_number=1,
|
|
max_retries=2,
|
|
)
|
|
|
|
assert result.success
|
|
assert mock_dispatch.call_count == 1 # no retries needed
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
# wait_for_completion
|
|
# ---------------------------------------------------------------------------
|
|
|
|
class TestWaitForCompletion:
|
|
async def test_returns_completed_when_issue_closed(self):
|
|
closed_resp = MagicMock(
|
|
status_code=200,
|
|
json=MagicMock(return_value={"state": "closed"}),
|
|
)
|
|
client_mock = AsyncMock()
|
|
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
|
client_mock.__aexit__ = AsyncMock(return_value=False)
|
|
client_mock.get = AsyncMock(return_value=closed_resp)
|
|
|
|
with (
|
|
patch("httpx.AsyncClient", return_value=client_mock),
|
|
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
|
):
|
|
status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5)
|
|
|
|
assert status == DispatchStatus.COMPLETED
|
|
|
|
async def test_returns_timed_out_when_still_open(self):
|
|
open_resp = MagicMock(
|
|
status_code=200,
|
|
json=MagicMock(return_value={"state": "open"}),
|
|
)
|
|
client_mock = AsyncMock()
|
|
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
|
client_mock.__aexit__ = AsyncMock(return_value=False)
|
|
client_mock.get = AsyncMock(return_value=open_resp)
|
|
|
|
with (
|
|
patch("httpx.AsyncClient", return_value=client_mock),
|
|
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
|
patch("asyncio.sleep", new_callable=AsyncMock),
|
|
):
|
|
status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2)
|
|
|
|
assert status == DispatchStatus.TIMED_OUT
|