diff --git a/src/timmy/dispatcher.py b/src/timmy/dispatcher.py new file mode 100644 index 00000000..a4af2e2b --- /dev/null +++ b/src/timmy/dispatcher.py @@ -0,0 +1,801 @@ +"""Agent dispatcher — route tasks to Claude Code, Kimi, APIs, or Timmy itself. + +Timmy's dispatch system: knows what agents are available, what they're good +at, and how to send them work. Uses Gitea labels and issue comments to assign +tasks and track completion. + +Dispatch flow: + 1. Match task type to agent strengths + 2. Check agent availability (idle or working?) + 3. Dispatch task with full context (issue link, requirements, criteria) + 4. Log assignment as a Gitea comment + 5. Monitor for completion or timeout + 6. Review output quality + 7. If output fails QA → reassign or escalate + +Agent interfaces: + - Claude Code → ``claude-ready`` Gitea label + issue comment + - Kimi Code → ``kimi-ready`` Gitea label + issue comment + - Agent APIs → HTTP POST to external endpoint + - Timmy (self) → direct local invocation + +Usage:: + + from timmy.dispatcher import dispatch_task, TaskType, AgentType + + result = await dispatch_task( + issue_number=1072, + task_type=TaskType.ARCHITECTURE, + title="Design the LLM router", + description="We need a cascade router...", + acceptance_criteria=["Failover works", "Metrics exposed"], + ) +""" + +from __future__ import annotations + +import asyncio +import logging +from dataclasses import dataclass, field +from enum import Enum +from typing import Any + +from config import settings + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Enumerations +# --------------------------------------------------------------------------- + +class AgentType(str, Enum): + """Known agents in the swarm.""" + + CLAUDE_CODE = "claude_code" + KIMI_CODE = "kimi_code" + AGENT_API = "agent_api" + TIMMY = "timmy" + + +class TaskType(str, Enum): + """Categories of engineering work.""" + + # Claude Code strengths + ARCHITECTURE = "architecture" + REFACTORING = "refactoring" + COMPLEX_REASONING = "complex_reasoning" + CODE_REVIEW = "code_review" + + # Kimi Code strengths + PARALLEL_IMPLEMENTATION = "parallel_implementation" + ROUTINE_CODING = "routine_coding" + FAST_ITERATION = "fast_iteration" + + # Agent API strengths + RESEARCH = "research" + ANALYSIS = "analysis" + SPECIALIZED = "specialized" + + # Timmy strengths + TRIAGE = "triage" + PLANNING = "planning" + CREATIVE = "creative" + ORCHESTRATION = "orchestration" + + +class DispatchStatus(str, Enum): + """Lifecycle state of a dispatched task.""" + + PENDING = "pending" + ASSIGNED = "assigned" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + ESCALATED = "escalated" + TIMED_OUT = "timed_out" + + +# --------------------------------------------------------------------------- +# Agent registry +# --------------------------------------------------------------------------- + +@dataclass +class AgentSpec: + """Capabilities and limits for a single agent.""" + + name: AgentType + display_name: str + strengths: frozenset[TaskType] + gitea_label: str | None # label to apply when dispatching + max_concurrent: int = 1 + interface: str = "gitea" # "gitea" | "api" | "local" + api_endpoint: str | None = None # for interface="api" + + +#: Authoritative agent registry — all known agents and their capabilities. +AGENT_REGISTRY: dict[AgentType, AgentSpec] = { + AgentType.CLAUDE_CODE: AgentSpec( + name=AgentType.CLAUDE_CODE, + display_name="Claude Code", + strengths=frozenset( + { + TaskType.ARCHITECTURE, + TaskType.REFACTORING, + TaskType.COMPLEX_REASONING, + TaskType.CODE_REVIEW, + } + ), + gitea_label="claude-ready", + max_concurrent=1, + interface="gitea", + ), + AgentType.KIMI_CODE: AgentSpec( + name=AgentType.KIMI_CODE, + display_name="Kimi Code", + strengths=frozenset( + { + TaskType.PARALLEL_IMPLEMENTATION, + TaskType.ROUTINE_CODING, + TaskType.FAST_ITERATION, + } + ), + gitea_label="kimi-ready", + max_concurrent=1, + interface="gitea", + ), + AgentType.AGENT_API: AgentSpec( + name=AgentType.AGENT_API, + display_name="Agent API", + strengths=frozenset( + { + TaskType.RESEARCH, + TaskType.ANALYSIS, + TaskType.SPECIALIZED, + } + ), + gitea_label=None, + max_concurrent=5, + interface="api", + ), + AgentType.TIMMY: AgentSpec( + name=AgentType.TIMMY, + display_name="Timmy", + strengths=frozenset( + { + TaskType.TRIAGE, + TaskType.PLANNING, + TaskType.CREATIVE, + TaskType.ORCHESTRATION, + } + ), + gitea_label=None, + max_concurrent=1, + interface="local", + ), +} + +#: Map from task type to preferred agent (primary routing table). +_TASK_ROUTING: dict[TaskType, AgentType] = { + TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE, + TaskType.REFACTORING: AgentType.CLAUDE_CODE, + TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE, + TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE, + TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE, + TaskType.ROUTINE_CODING: AgentType.KIMI_CODE, + TaskType.FAST_ITERATION: AgentType.KIMI_CODE, + TaskType.RESEARCH: AgentType.AGENT_API, + TaskType.ANALYSIS: AgentType.AGENT_API, + TaskType.SPECIALIZED: AgentType.AGENT_API, + TaskType.TRIAGE: AgentType.TIMMY, + TaskType.PLANNING: AgentType.TIMMY, + TaskType.CREATIVE: AgentType.TIMMY, + TaskType.ORCHESTRATION: AgentType.TIMMY, +} + + +# --------------------------------------------------------------------------- +# Dispatch result +# --------------------------------------------------------------------------- + +@dataclass +class DispatchResult: + """Outcome of a dispatch call.""" + + task_type: TaskType + agent: AgentType + issue_number: int | None + status: DispatchStatus + comment_id: int | None = None + label_applied: str | None = None + error: str | None = None + retry_count: int = 0 + metadata: dict[str, Any] = field(default_factory=dict) + + @property + def success(self) -> bool: # noqa: D401 + return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED) + + +# --------------------------------------------------------------------------- +# Routing logic +# --------------------------------------------------------------------------- + +def select_agent(task_type: TaskType) -> AgentType: + """Return the best agent for *task_type* based on the routing table. + + Args: + task_type: The category of engineering work to be done. + + Returns: + The :class:`AgentType` best suited to handle this task. + """ + return _TASK_ROUTING.get(task_type, AgentType.TIMMY) + + +def infer_task_type(title: str, description: str = "") -> TaskType: + """Heuristic: guess the most appropriate :class:`TaskType` from text. + + Scans *title* and *description* for keyword signals and returns the + strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`. + + Args: + title: Short task title. + description: Longer task description (optional). + + Returns: + The inferred :class:`TaskType`. + """ + text = (title + " " + description).lower() + + _SIGNALS: list[tuple[TaskType, frozenset[str]]] = [ + (TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})), + (TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})), + (TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})), + (TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})), + (TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})), + (TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})), + (TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})), + (TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})), + (TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})), + (TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})), + (TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})), + (TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})), + ] + + for task_type, keywords in _SIGNALS: + if any(kw in text for kw in keywords): + return task_type + + return TaskType.ROUTINE_CODING + + +# --------------------------------------------------------------------------- +# Gitea helpers +# --------------------------------------------------------------------------- + +async def _post_gitea_comment( + client: Any, + base_url: str, + repo: str, + headers: dict[str, str], + issue_number: int, + body: str, +) -> int | None: + """Post a comment on a Gitea issue and return the comment ID.""" + try: + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue_number}/comments", + headers=headers, + json={"body": body}, + ) + if resp.status_code in (200, 201): + return resp.json().get("id") + logger.warning( + "Comment on #%s returned %s: %s", + issue_number, + resp.status_code, + resp.text[:200], + ) + except Exception as exc: + logger.warning("Failed to post comment on #%s: %s", issue_number, exc) + return None + + +async def _apply_gitea_label( + client: Any, + base_url: str, + repo: str, + headers: dict[str, str], + issue_number: int, + label_name: str, + label_color: str = "#0075ca", +) -> bool: + """Ensure *label_name* exists and apply it to an issue. + + Returns True if the label was successfully applied. + """ + # Resolve or create the label + label_id: int | None = None + try: + resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers) + if resp.status_code == 200: + for lbl in resp.json(): + if lbl.get("name") == label_name: + label_id = lbl["id"] + break + except Exception as exc: + logger.warning("Failed to list labels: %s", exc) + return False + + if label_id is None: + try: + resp = await client.post( + f"{base_url}/repos/{repo}/labels", + headers=headers, + json={"name": label_name, "color": label_color}, + ) + if resp.status_code in (200, 201): + label_id = resp.json().get("id") + except Exception as exc: + logger.warning("Failed to create label %r: %s", label_name, exc) + return False + + if label_id is None: + return False + + # Apply label to the issue + try: + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue_number}/labels", + headers=headers, + json={"labels": [label_id]}, + ) + return resp.status_code in (200, 201) + except Exception as exc: + logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc) + return False + + +async def _poll_issue_completion( + issue_number: int, + poll_interval: int = 60, + max_wait: int = 7200, +) -> DispatchStatus: + """Poll a Gitea issue until closed (completed) or timeout. + + Args: + issue_number: Gitea issue to watch. + poll_interval: Seconds between polls. + max_wait: Maximum total seconds to wait. + + Returns: + :attr:`DispatchStatus.COMPLETED` if the issue was closed, + :attr:`DispatchStatus.TIMED_OUT` otherwise. + """ + try: + import httpx + except ImportError as exc: + logger.warning("poll_issue_completion: missing dependency: %s", exc) + return DispatchStatus.FAILED + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = {"Authorization": f"token {settings.gitea_token}"} + issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}" + + elapsed = 0 + while elapsed < max_wait: + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(issue_url, headers=headers) + if resp.status_code == 200 and resp.json().get("state") == "closed": + logger.info("Issue #%s closed — task completed", issue_number) + return DispatchStatus.COMPLETED + except Exception as exc: + logger.warning("Poll error for issue #%s: %s", issue_number, exc) + + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait) + return DispatchStatus.TIMED_OUT + + +# --------------------------------------------------------------------------- +# Core dispatch functions +# --------------------------------------------------------------------------- + +async def _dispatch_via_gitea( + agent: AgentType, + issue_number: int, + title: str, + description: str, + acceptance_criteria: list[str], +) -> DispatchResult: + """Assign a task by applying a Gitea label and posting an assignment comment. + + Args: + agent: Target agent. + issue_number: Gitea issue to assign. + title: Short task title. + description: Full task description. + acceptance_criteria: List of acceptance criteria strings. + + Returns: + :class:`DispatchResult` describing the outcome. + """ + try: + import httpx + except ImportError as exc: + return DispatchResult( + task_type=TaskType.ROUTINE_CODING, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=f"Missing dependency: {exc}", + ) + + spec = AGENT_REGISTRY[agent] + task_type = infer_task_type(title, description) + + if not settings.gitea_enabled or not settings.gitea_token: + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error="Gitea integration not configured (no token or disabled).", + ) + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + + comment_id: int | None = None + label_applied: str | None = None + + async with httpx.AsyncClient(timeout=15) as client: + # 1. Apply agent label (if applicable) + if spec.gitea_label: + ok = await _apply_gitea_label( + client, base_url, repo, headers, issue_number, spec.gitea_label + ) + if ok: + label_applied = spec.gitea_label + logger.info( + "Applied label %r to issue #%s for %s", + spec.gitea_label, + issue_number, + spec.display_name, + ) + else: + logger.warning( + "Could not apply label %r to issue #%s", + spec.gitea_label, + issue_number, + ) + + # 2. Post assignment comment + criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_" + comment_body = ( + f"## Assigned to {spec.display_name}\n\n" + f"**Task type:** `{task_type.value}`\n\n" + f"**Description:**\n{description}\n\n" + f"**Acceptance criteria:**\n{criteria_md}\n\n" + f"---\n*Dispatched by Timmy agent dispatcher.*" + ) + comment_id = await _post_gitea_comment( + client, base_url, repo, headers, issue_number, comment_body + ) + + if comment_id is not None or label_applied is not None: + logger.info( + "Dispatched issue #%s to %s (label=%r, comment=%s)", + issue_number, + spec.display_name, + label_applied, + comment_id, + ) + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.ASSIGNED, + comment_id=comment_id, + label_applied=label_applied, + ) + + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error="Failed to apply label and post comment — check Gitea connectivity.", + ) + + +async def _dispatch_via_api( + agent: AgentType, + title: str, + description: str, + acceptance_criteria: list[str], + issue_number: int | None = None, + endpoint: str | None = None, +) -> DispatchResult: + """Dispatch a task to an external HTTP API agent. + + Args: + agent: Target agent. + title: Short task title. + description: Task description. + acceptance_criteria: List of acceptance criteria. + issue_number: Optional Gitea issue for cross-referencing. + endpoint: Override API endpoint URL (uses spec default if omitted). + + Returns: + :class:`DispatchResult` describing the outcome. + """ + spec = AGENT_REGISTRY[agent] + task_type = infer_task_type(title, description) + url = endpoint or spec.api_endpoint + + if not url: + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=f"No API endpoint configured for agent {agent.value}.", + ) + + payload = { + "title": title, + "description": description, + "acceptance_criteria": acceptance_criteria, + "issue_number": issue_number, + "agent": agent.value, + "task_type": task_type.value, + } + + try: + import httpx + + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post(url, json=payload) + + if resp.status_code in (200, 201, 202): + logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url) + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.ASSIGNED, + metadata={"response": resp.json() if resp.content else {}}, + ) + + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=f"API agent returned {resp.status_code}: {resp.text[:200]}", + ) + except Exception as exc: + logger.warning("API dispatch to %s failed: %s", url, exc) + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=str(exc), + ) + + +async def _dispatch_local( + title: str, + description: str = "", + acceptance_criteria: list[str] | None = None, + issue_number: int | None = None, +) -> DispatchResult: + """Handle a task locally — Timmy processes it directly. + + This is a lightweight stub. Real local execution should be wired + into the agentic loop or a dedicated Timmy tool. + + Args: + title: Short task title. + description: Task description. + acceptance_criteria: Acceptance criteria list. + issue_number: Optional Gitea issue number for logging. + + Returns: + :class:`DispatchResult` with ASSIGNED status (local execution is + assumed to succeed at dispatch time). + """ + task_type = infer_task_type(title, description) + logger.info( + "Timmy handling task locally: %r (issue #%s)", title[:60], issue_number + ) + return DispatchResult( + task_type=task_type, + agent=AgentType.TIMMY, + issue_number=issue_number, + status=DispatchStatus.ASSIGNED, + metadata={"local": True, "description": description}, + ) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + +async def dispatch_task( + title: str, + description: str = "", + acceptance_criteria: list[str] | None = None, + task_type: TaskType | None = None, + agent: AgentType | None = None, + issue_number: int | None = None, + api_endpoint: str | None = None, + max_retries: int = 1, +) -> DispatchResult: + """Route a task to the best available agent. + + This is the primary entry point. Callers can either specify the + *agent* and *task_type* explicitly or let the dispatcher infer them + from the *title* and *description*. + + Args: + title: Short human-readable task title. + description: Full task description with context. + acceptance_criteria: List of acceptance criteria strings. + task_type: Override automatic task type inference. + agent: Override automatic agent selection. + issue_number: Gitea issue number to log the assignment on. + api_endpoint: Override API endpoint for AGENT_API dispatches. + max_retries: Number of retry attempts on failure (default 1). + + Returns: + :class:`DispatchResult` describing the final dispatch outcome. + + Example:: + + result = await dispatch_task( + issue_number=1072, + title="Build the cascade LLM router", + description="We need automatic failover...", + acceptance_criteria=["Circuit breaker works", "Metrics exposed"], + ) + if result.success: + print(f"Assigned to {result.agent.value}") + """ + criteria = acceptance_criteria or [] + + if not title.strip(): + return DispatchResult( + task_type=task_type or TaskType.ROUTINE_CODING, + agent=agent or AgentType.TIMMY, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error="`title` is required.", + ) + + resolved_type = task_type or infer_task_type(title, description) + resolved_agent = agent or select_agent(resolved_type) + + logger.info( + "Dispatching task %r → %s (type=%s, issue=#%s)", + title[:60], + resolved_agent.value, + resolved_type.value, + issue_number, + ) + + spec = AGENT_REGISTRY[resolved_agent] + + last_result: DispatchResult | None = None + for attempt in range(max_retries + 1): + if attempt > 0: + logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60]) + + if spec.interface == "gitea" and issue_number is not None: + result = await _dispatch_via_gitea( + resolved_agent, issue_number, title, description, criteria + ) + elif spec.interface == "api": + result = await _dispatch_via_api( + resolved_agent, title, description, criteria, issue_number, api_endpoint + ) + else: + result = await _dispatch_local(title, description, criteria, issue_number) + + result.retry_count = attempt + last_result = result + + if result.success: + return result + + logger.warning( + "Dispatch attempt %d failed for task %r: %s", + attempt + 1, + title[:60], + result.error, + ) + + # All attempts exhausted — escalate + assert last_result is not None + last_result.status = DispatchStatus.ESCALATED + logger.error( + "Task %r escalated after %d failed attempt(s): %s", + title[:60], + max_retries + 1, + last_result.error, + ) + + # Try to log the escalation on the issue + if issue_number is not None: + await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error") + + return last_result + + +async def _log_escalation( + issue_number: int, + agent: AgentType, + error: str, +) -> None: + """Post an escalation notice on the Gitea issue.""" + try: + import httpx + + if not settings.gitea_enabled or not settings.gitea_token: + return + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + body = ( + f"## Dispatch Escalated\n\n" + f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** " + f"after {1} attempt(s).\n\n" + f"**Error:** {error}\n\n" + f"Manual intervention required.\n\n" + f"---\n*Timmy agent dispatcher.*" + ) + async with httpx.AsyncClient(timeout=10) as client: + await _post_gitea_comment( + client, base_url, repo, headers, issue_number, body + ) + except Exception as exc: + logger.warning("Failed to post escalation comment: %s", exc) + + +# --------------------------------------------------------------------------- +# Monitoring helper +# --------------------------------------------------------------------------- + +async def wait_for_completion( + issue_number: int, + poll_interval: int = 60, + max_wait: int = 7200, +) -> DispatchStatus: + """Block until the assigned Gitea issue is closed or the timeout fires. + + Useful for synchronous orchestration where the caller wants to wait for + the assigned agent to finish before proceeding. + + Args: + issue_number: Gitea issue to monitor. + poll_interval: Seconds between status polls. + max_wait: Maximum wait in seconds (default 2 hours). + + Returns: + :attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`. + """ + return await _poll_issue_completion(issue_number, poll_interval, max_wait) diff --git a/tests/timmy/test_dispatcher.py b/tests/timmy/test_dispatcher.py new file mode 100644 index 00000000..cab79ce5 --- /dev/null +++ b/tests/timmy/test_dispatcher.py @@ -0,0 +1,503 @@ +"""Tests for the agent dispatcher (timmy.dispatcher).""" + +from __future__ import annotations + +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from timmy.dispatcher import ( + AGENT_REGISTRY, + AgentType, + DispatchResult, + DispatchStatus, + TaskType, + _dispatch_local, + _dispatch_via_api, + _dispatch_via_gitea, + dispatch_task, + infer_task_type, + select_agent, + wait_for_completion, +) + + +# --------------------------------------------------------------------------- +# Agent registry +# --------------------------------------------------------------------------- + +class TestAgentRegistry: + def test_all_agents_present(self): + for member in AgentType: + assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry" + + def test_agent_specs_have_display_names(self): + for agent, spec in AGENT_REGISTRY.items(): + assert spec.display_name, f"{agent} has empty display_name" + + def test_gitea_agents_have_labels(self): + for agent, spec in AGENT_REGISTRY.items(): + if spec.interface == "gitea": + assert spec.gitea_label, f"{agent} is gitea interface but has no label" + + def test_non_gitea_agents_have_no_labels(self): + for agent, spec in AGENT_REGISTRY.items(): + if spec.interface not in ("gitea",): + # api and local agents may have no label + assert spec.gitea_label is None or spec.interface == "gitea" + + def test_max_concurrent_positive(self): + for agent, spec in AGENT_REGISTRY.items(): + assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1" + + +# --------------------------------------------------------------------------- +# select_agent +# --------------------------------------------------------------------------- + +class TestSelectAgent: + def test_architecture_routes_to_claude(self): + assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE + + def test_refactoring_routes_to_claude(self): + assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE + + def test_code_review_routes_to_claude(self): + assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE + + def test_routine_coding_routes_to_kimi(self): + assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE + + def test_fast_iteration_routes_to_kimi(self): + assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE + + def test_research_routes_to_agent_api(self): + assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API + + def test_triage_routes_to_timmy(self): + assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY + + def test_planning_routes_to_timmy(self): + assert select_agent(TaskType.PLANNING) == AgentType.TIMMY + + +# --------------------------------------------------------------------------- +# infer_task_type +# --------------------------------------------------------------------------- + +class TestInferTaskType: + def test_architecture_keyword(self): + assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE + + def test_refactor_keyword(self): + assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING + + def test_code_review_keyword(self): + assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW + + def test_research_keyword(self): + assert infer_task_type("Research embedding models") == TaskType.RESEARCH + + def test_triage_keyword(self): + assert infer_task_type("Triage open issues") == TaskType.TRIAGE + + def test_planning_keyword(self): + assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING + + def test_fallback_returns_routine_coding(self): + assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING + + def test_description_contributes_to_inference(self): + result = infer_task_type("Implement feature", "We need to refactor the old code") + assert result == TaskType.REFACTORING + + def test_case_insensitive(self): + assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE + + +# --------------------------------------------------------------------------- +# DispatchResult +# --------------------------------------------------------------------------- + +class TestDispatchResult: + def test_success_when_assigned(self): + r = DispatchResult( + task_type=TaskType.ROUTINE_CODING, + agent=AgentType.KIMI_CODE, + issue_number=1, + status=DispatchStatus.ASSIGNED, + ) + assert r.success is True + + def test_success_when_completed(self): + r = DispatchResult( + task_type=TaskType.ROUTINE_CODING, + agent=AgentType.KIMI_CODE, + issue_number=1, + status=DispatchStatus.COMPLETED, + ) + assert r.success is True + + def test_not_success_when_failed(self): + r = DispatchResult( + task_type=TaskType.ROUTINE_CODING, + agent=AgentType.KIMI_CODE, + issue_number=1, + status=DispatchStatus.FAILED, + ) + assert r.success is False + + def test_not_success_when_escalated(self): + r = DispatchResult( + task_type=TaskType.ROUTINE_CODING, + agent=AgentType.KIMI_CODE, + issue_number=1, + status=DispatchStatus.ESCALATED, + ) + assert r.success is False + + +# --------------------------------------------------------------------------- +# _dispatch_local +# --------------------------------------------------------------------------- + +class TestDispatchLocal: + async def test_returns_assigned(self): + result = await _dispatch_local( + title="Plan the migration", + description="We need a plan.", + acceptance_criteria=["Plan is documented"], + issue_number=42, + ) + assert result.status == DispatchStatus.ASSIGNED + assert result.agent == AgentType.TIMMY + assert result.issue_number == 42 + + async def test_infers_task_type(self): + result = await _dispatch_local( + title="Plan the sprint", + description="", + acceptance_criteria=[], + ) + assert result.task_type == TaskType.PLANNING + + async def test_no_issue_number(self): + result = await _dispatch_local(title="Do something", description="") + assert result.issue_number is None + + +# --------------------------------------------------------------------------- +# _dispatch_via_api +# --------------------------------------------------------------------------- + +class TestDispatchViaApi: + async def test_no_endpoint_returns_failed(self): + result = await _dispatch_via_api( + agent=AgentType.AGENT_API, + title="Analyse logs", + description="", + acceptance_criteria=[], + ) + assert result.status == DispatchStatus.FAILED + assert "No API endpoint" in (result.error or "") + + async def test_successful_api_call(self): + mock_resp = MagicMock() + mock_resp.status_code = 202 + mock_resp.content = b'{"ok": true}' + mock_resp.json.return_value = {"ok": True} + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.post = AsyncMock(return_value=mock_resp) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await _dispatch_via_api( + agent=AgentType.AGENT_API, + title="Analyse logs", + description="Look at the logs", + acceptance_criteria=["Report produced"], + endpoint="http://fake-agent/dispatch", + ) + + assert result.status == DispatchStatus.ASSIGNED + assert result.agent == AgentType.AGENT_API + + async def test_api_error_returns_failed(self): + mock_resp = MagicMock() + mock_resp.status_code = 500 + mock_resp.text = "Internal Server Error" + + mock_client = AsyncMock() + mock_client.__aenter__ = AsyncMock(return_value=mock_client) + mock_client.__aexit__ = AsyncMock(return_value=False) + mock_client.post = AsyncMock(return_value=mock_resp) + + with patch("httpx.AsyncClient", return_value=mock_client): + result = await _dispatch_via_api( + agent=AgentType.AGENT_API, + title="Analyse logs", + description="", + acceptance_criteria=[], + endpoint="http://fake-agent/dispatch", + ) + + assert result.status == DispatchStatus.FAILED + assert "500" in (result.error or "") + + +# --------------------------------------------------------------------------- +# _dispatch_via_gitea +# --------------------------------------------------------------------------- + +_GITEA_SETTINGS = MagicMock( + gitea_enabled=True, + gitea_token="test-token", + gitea_url="http://gitea.test", + gitea_repo="owner/repo", +) + + +class TestDispatchViaGitea: + def _make_client(self, label_list=None, label_create_status=201, comment_status=201): + """Build a mock httpx.AsyncClient for Gitea interactions.""" + label_resp = MagicMock() + label_resp.status_code = 200 + label_resp.json.return_value = label_list or [] + + create_label_resp = MagicMock() + create_label_resp.status_code = label_create_status + create_label_resp.json.return_value = {"id": 99} + + apply_label_resp = MagicMock() + apply_label_resp.status_code = 201 + + comment_resp = MagicMock() + comment_resp.status_code = comment_status + comment_resp.json.return_value = {"id": 7} + + client = AsyncMock() + client.__aenter__ = AsyncMock(return_value=client) + client.__aexit__ = AsyncMock(return_value=False) + client.get = AsyncMock(return_value=label_resp) + client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp]) + return client + + async def test_successful_gitea_dispatch(self): + client = self._make_client() + with ( + patch("httpx.AsyncClient", return_value=client), + patch("timmy.dispatcher.settings", _GITEA_SETTINGS), + ): + result = await _dispatch_via_gitea( + agent=AgentType.CLAUDE_CODE, + issue_number=1072, + title="Design the router", + description="We need a cascade router.", + acceptance_criteria=["Failover works"], + ) + + assert result.success + assert result.agent == AgentType.CLAUDE_CODE + assert result.issue_number == 1072 + assert result.status == DispatchStatus.ASSIGNED + + async def test_no_gitea_token_returns_failed(self): + bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b") + with patch("timmy.dispatcher.settings", bad_settings): + result = await _dispatch_via_gitea( + agent=AgentType.CLAUDE_CODE, + issue_number=1, + title="Some task", + description="", + acceptance_criteria=[], + ) + assert result.status == DispatchStatus.FAILED + assert "not configured" in (result.error or "").lower() + + async def test_gitea_disabled_returns_failed(self): + bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b") + with patch("timmy.dispatcher.settings", bad_settings): + result = await _dispatch_via_gitea( + agent=AgentType.CLAUDE_CODE, + issue_number=1, + title="Some task", + description="", + acceptance_criteria=[], + ) + assert result.status == DispatchStatus.FAILED + + async def test_existing_label_reused(self): + """When the label already exists, it should be reused (no creation call).""" + label_resp = MagicMock() + label_resp.status_code = 200 + label_resp.json.return_value = [{"name": "claude-ready", "id": 55}] + + apply_resp = MagicMock() + apply_resp.status_code = 201 + + comment_resp = MagicMock() + comment_resp.status_code = 201 + comment_resp.json.return_value = {"id": 8} + + client = AsyncMock() + client.__aenter__ = AsyncMock(return_value=client) + client.__aexit__ = AsyncMock(return_value=False) + client.get = AsyncMock(return_value=label_resp) + client.post = AsyncMock(side_effect=[apply_resp, comment_resp]) + + with ( + patch("httpx.AsyncClient", return_value=client), + patch("timmy.dispatcher.settings", _GITEA_SETTINGS), + ): + result = await _dispatch_via_gitea( + agent=AgentType.CLAUDE_CODE, + issue_number=10, + title="Architecture task", + description="", + acceptance_criteria=[], + ) + + assert result.success + # Should only have 2 POST calls: apply label + comment (no label creation) + assert client.post.call_count == 2 + + +# --------------------------------------------------------------------------- +# dispatch_task (integration-style) +# --------------------------------------------------------------------------- + +class TestDispatchTask: + async def test_empty_title_returns_failed(self): + result = await dispatch_task(title=" ") + assert result.status == DispatchStatus.FAILED + assert "`title` is required" in (result.error or "") + + async def test_local_dispatch_for_timmy_task(self): + result = await dispatch_task( + title="Triage the open issues", + description="We have 40 open issues.", + acceptance_criteria=["Issues are labelled"], + task_type=TaskType.TRIAGE, + ) + assert result.agent == AgentType.TIMMY + assert result.success + + async def test_explicit_agent_override(self): + """Caller can force a specific agent regardless of task type.""" + result = await dispatch_task( + title="Triage the open issues", + agent=AgentType.TIMMY, + ) + assert result.agent == AgentType.TIMMY + + async def test_gitea_dispatch_when_issue_provided(self): + client_mock = AsyncMock() + client_mock.__aenter__ = AsyncMock(return_value=client_mock) + client_mock.__aexit__ = AsyncMock(return_value=False) + client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[]))) + create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1})) + apply_resp = MagicMock(status_code=201) + comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5})) + client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp]) + + with ( + patch("httpx.AsyncClient", return_value=client_mock), + patch("timmy.dispatcher.settings", _GITEA_SETTINGS), + ): + result = await dispatch_task( + title="Design the cascade router", + description="Architecture task.", + task_type=TaskType.ARCHITECTURE, + issue_number=1072, + ) + + assert result.agent == AgentType.CLAUDE_CODE + assert result.success + + async def test_escalation_after_max_retries(self): + """If all attempts fail, the result is ESCALATED.""" + with ( + patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch, + patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock), + ): + mock_dispatch.return_value = DispatchResult( + task_type=TaskType.ARCHITECTURE, + agent=AgentType.CLAUDE_CODE, + issue_number=1, + status=DispatchStatus.FAILED, + error="Gitea offline", + ) + result = await dispatch_task( + title="Design router", + task_type=TaskType.ARCHITECTURE, + issue_number=1, + max_retries=1, + ) + + assert result.status == DispatchStatus.ESCALATED + assert mock_dispatch.call_count == 2 # initial + 1 retry + + async def test_no_retry_on_success(self): + with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch: + mock_dispatch.return_value = DispatchResult( + task_type=TaskType.ARCHITECTURE, + agent=AgentType.CLAUDE_CODE, + issue_number=1, + status=DispatchStatus.ASSIGNED, + comment_id=42, + label_applied="claude-ready", + ) + result = await dispatch_task( + title="Design router", + task_type=TaskType.ARCHITECTURE, + issue_number=1, + max_retries=2, + ) + + assert result.success + assert mock_dispatch.call_count == 1 # no retries needed + + +# --------------------------------------------------------------------------- +# wait_for_completion +# --------------------------------------------------------------------------- + +class TestWaitForCompletion: + async def test_returns_completed_when_issue_closed(self): + closed_resp = MagicMock( + status_code=200, + json=MagicMock(return_value={"state": "closed"}), + ) + client_mock = AsyncMock() + client_mock.__aenter__ = AsyncMock(return_value=client_mock) + client_mock.__aexit__ = AsyncMock(return_value=False) + client_mock.get = AsyncMock(return_value=closed_resp) + + with ( + patch("httpx.AsyncClient", return_value=client_mock), + patch("timmy.dispatcher.settings", _GITEA_SETTINGS), + ): + status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5) + + assert status == DispatchStatus.COMPLETED + + async def test_returns_timed_out_when_still_open(self): + open_resp = MagicMock( + status_code=200, + json=MagicMock(return_value={"state": "open"}), + ) + client_mock = AsyncMock() + client_mock.__aenter__ = AsyncMock(return_value=client_mock) + client_mock.__aexit__ = AsyncMock(return_value=False) + client_mock.get = AsyncMock(return_value=open_resp) + + with ( + patch("httpx.AsyncClient", return_value=client_mock), + patch("timmy.dispatcher.settings", _GITEA_SETTINGS), + patch("asyncio.sleep", new_callable=AsyncMock), + ): + status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2) + + assert status == DispatchStatus.TIMED_OUT