From 7875e2309e8cdc4cc8aa5a352fd7d2b72825dba6 Mon Sep 17 00:00:00 2001 From: Timmy Time Date: Tue, 24 Mar 2026 21:16:22 +0000 Subject: [PATCH] [loop-cycle-1] refactor: split dispatcher.py into dispatch/ package (#1450) (#1469) --- src/timmy/dispatch/__init__.py | 40 ++ src/timmy/dispatch/assignment.py | 491 +++++++++++++++++ src/timmy/dispatch/queue.py | 198 +++++++ src/timmy/dispatch/routing.py | 230 ++++++++ src/timmy/dispatcher.py | 890 +------------------------------ 5 files changed, 966 insertions(+), 883 deletions(-) create mode 100644 src/timmy/dispatch/__init__.py create mode 100644 src/timmy/dispatch/assignment.py create mode 100644 src/timmy/dispatch/queue.py create mode 100644 src/timmy/dispatch/routing.py diff --git a/src/timmy/dispatch/__init__.py b/src/timmy/dispatch/__init__.py new file mode 100644 index 00000000..2879fdf1 --- /dev/null +++ b/src/timmy/dispatch/__init__.py @@ -0,0 +1,40 @@ +"""Agent dispatch package — split from ``timmy.dispatcher``. + +Re-exports all public (and commonly-tested private) names so that +``from timmy.dispatch import X`` works for every symbol that was +previously available in ``timmy.dispatcher``. +""" + +from .assignment import ( + DispatchResult, + _dispatch_local, + _dispatch_via_api, + _dispatch_via_gitea, + dispatch_task, +) +from .queue import wait_for_completion +from .routing import ( + AGENT_REGISTRY, + AgentSpec, + AgentType, + DispatchStatus, + TaskType, + infer_task_type, + select_agent, +) + +__all__ = [ + "AgentType", + "TaskType", + "DispatchStatus", + "AgentSpec", + "AGENT_REGISTRY", + "DispatchResult", + "select_agent", + "infer_task_type", + "dispatch_task", + "wait_for_completion", + "_dispatch_local", + "_dispatch_via_api", + "_dispatch_via_gitea", +] diff --git a/src/timmy/dispatch/assignment.py b/src/timmy/dispatch/assignment.py new file mode 100644 index 00000000..1ae928bf --- /dev/null +++ b/src/timmy/dispatch/assignment.py @@ -0,0 +1,491 @@ +"""Core dispatch functions — validate, format, and send tasks to agents. + +Contains :func:`dispatch_task` (the primary entry point) and the +per-interface dispatch helpers (:func:`_dispatch_via_gitea`, +:func:`_dispatch_via_api`, :func:`_dispatch_local`). +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from typing import Any + +from config import settings + +from .queue import _apply_gitea_label, _log_escalation, _post_gitea_comment +from .routing import ( + AGENT_REGISTRY, + AgentType, + DispatchStatus, + TaskType, + infer_task_type, + select_agent, +) + +logger = logging.getLogger(__name__) + + +# --------------------------------------------------------------------------- +# Dispatch result +# --------------------------------------------------------------------------- + + +@dataclass +class DispatchResult: + """Outcome of a dispatch call.""" + + task_type: TaskType + agent: AgentType + issue_number: int | None + status: DispatchStatus + comment_id: int | None = None + label_applied: str | None = None + error: str | None = None + retry_count: int = 0 + metadata: dict[str, Any] = field(default_factory=dict) + + @property + def success(self) -> bool: # noqa: D401 + return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED) + + +# --------------------------------------------------------------------------- +# Core dispatch functions +# --------------------------------------------------------------------------- + + +def _format_assignment_comment( + display_name: str, + task_type: TaskType, + description: str, + acceptance_criteria: list[str], +) -> str: + """Build the markdown comment body for a task assignment. + + Args: + display_name: Human-readable agent name. + task_type: The inferred task type. + description: Task description. + acceptance_criteria: List of acceptance criteria strings. + + Returns: + Formatted markdown string for the comment. + """ + criteria_md = ( + "\n".join(f"- {c}" for c in acceptance_criteria) + if acceptance_criteria + else "_None specified_" + ) + return ( + f"## Assigned to {display_name}\n\n" + f"**Task type:** `{task_type.value}`\n\n" + f"**Description:**\n{description}\n\n" + f"**Acceptance criteria:**\n{criteria_md}\n\n" + f"---\n*Dispatched by Timmy agent dispatcher.*" + ) + + +def _select_label(agent: AgentType) -> str | None: + """Return the Gitea label for an agent based on its spec. + + Args: + agent: The target agent. + + Returns: + Label name or None if the agent has no label. + """ + return AGENT_REGISTRY[agent].gitea_label + + +async def _dispatch_via_gitea( + agent: AgentType, + issue_number: int, + title: str, + description: str, + acceptance_criteria: list[str], +) -> DispatchResult: + """Assign a task by applying a Gitea label and posting an assignment comment. + + Args: + agent: Target agent. + issue_number: Gitea issue to assign. + title: Short task title. + description: Full task description. + acceptance_criteria: List of acceptance criteria strings. + + Returns: + :class:`DispatchResult` describing the outcome. + """ + try: + import httpx + except ImportError as exc: + return DispatchResult( + task_type=TaskType.ROUTINE_CODING, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=f"Missing dependency: {exc}", + ) + + spec = AGENT_REGISTRY[agent] + task_type = infer_task_type(title, description) + + if not settings.gitea_enabled or not settings.gitea_token: + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error="Gitea integration not configured (no token or disabled).", + ) + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + + comment_id: int | None = None + label_applied: str | None = None + + async with httpx.AsyncClient(timeout=15) as client: + # 1. Apply agent label (if applicable) + label = _select_label(agent) + if label: + ok = await _apply_gitea_label(client, base_url, repo, headers, issue_number, label) + if ok: + label_applied = label + logger.info( + "Applied label %r to issue #%s for %s", + label, + issue_number, + spec.display_name, + ) + else: + logger.warning( + "Could not apply label %r to issue #%s", + label, + issue_number, + ) + + # 2. Post assignment comment + comment_body = _format_assignment_comment( + spec.display_name, task_type, description, acceptance_criteria + ) + comment_id = await _post_gitea_comment( + client, base_url, repo, headers, issue_number, comment_body + ) + + if comment_id is not None or label_applied is not None: + logger.info( + "Dispatched issue #%s to %s (label=%r, comment=%s)", + issue_number, + spec.display_name, + label_applied, + comment_id, + ) + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.ASSIGNED, + comment_id=comment_id, + label_applied=label_applied, + ) + + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error="Failed to apply label and post comment — check Gitea connectivity.", + ) + + +async def _dispatch_via_api( + agent: AgentType, + title: str, + description: str, + acceptance_criteria: list[str], + issue_number: int | None = None, + endpoint: str | None = None, +) -> DispatchResult: + """Dispatch a task to an external HTTP API agent. + + Args: + agent: Target agent. + title: Short task title. + description: Task description. + acceptance_criteria: List of acceptance criteria. + issue_number: Optional Gitea issue for cross-referencing. + endpoint: Override API endpoint URL (uses spec default if omitted). + + Returns: + :class:`DispatchResult` describing the outcome. + """ + spec = AGENT_REGISTRY[agent] + task_type = infer_task_type(title, description) + url = endpoint or spec.api_endpoint + + if not url: + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=f"No API endpoint configured for agent {agent.value}.", + ) + + payload = { + "title": title, + "description": description, + "acceptance_criteria": acceptance_criteria, + "issue_number": issue_number, + "agent": agent.value, + "task_type": task_type.value, + } + + try: + import httpx + + async with httpx.AsyncClient(timeout=30) as client: + resp = await client.post(url, json=payload) + + if resp.status_code in (200, 201, 202): + logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url) + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.ASSIGNED, + metadata={"response": resp.json() if resp.content else {}}, + ) + + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=f"API agent returned {resp.status_code}: {resp.text[:200]}", + ) + except Exception as exc: + logger.warning("API dispatch to %s failed: %s", url, exc) + return DispatchResult( + task_type=task_type, + agent=agent, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error=str(exc), + ) + + +async def _dispatch_local( + title: str, + description: str = "", + acceptance_criteria: list[str] | None = None, + issue_number: int | None = None, +) -> DispatchResult: + """Handle a task locally — Timmy processes it directly. + + This is a lightweight stub. Real local execution should be wired + into the agentic loop or a dedicated Timmy tool. + + Args: + title: Short task title. + description: Task description. + acceptance_criteria: Acceptance criteria list. + issue_number: Optional Gitea issue number for logging. + + Returns: + :class:`DispatchResult` with ASSIGNED status (local execution is + assumed to succeed at dispatch time). + """ + task_type = infer_task_type(title, description) + logger.info("Timmy handling task locally: %r (issue #%s)", title[:60], issue_number) + return DispatchResult( + task_type=task_type, + agent=AgentType.TIMMY, + issue_number=issue_number, + status=DispatchStatus.ASSIGNED, + metadata={"local": True, "description": description}, + ) + + +# --------------------------------------------------------------------------- +# Public entry point +# --------------------------------------------------------------------------- + + +def _validate_task( + title: str, + task_type: TaskType | None, + agent: AgentType | None, + issue_number: int | None, +) -> DispatchResult | None: + """Validate task preconditions. + + Args: + title: Task title to validate. + task_type: Optional task type for result construction. + agent: Optional agent for result construction. + issue_number: Optional issue number for result construction. + + Returns: + A failed DispatchResult if validation fails, None otherwise. + """ + if not title.strip(): + return DispatchResult( + task_type=task_type or TaskType.ROUTINE_CODING, + agent=agent or AgentType.TIMMY, + issue_number=issue_number, + status=DispatchStatus.FAILED, + error="`title` is required.", + ) + return None + + +def _select_dispatch_strategy(agent: AgentType, issue_number: int | None) -> str: + """Select the dispatch strategy based on agent interface and context. + + Args: + agent: The target agent. + issue_number: Optional Gitea issue number. + + Returns: + Strategy name: "gitea", "api", or "local". + """ + spec = AGENT_REGISTRY[agent] + if spec.interface == "gitea" and issue_number is not None: + return "gitea" + if spec.interface == "api": + return "api" + return "local" + + +def _log_dispatch_result( + title: str, + result: DispatchResult, + attempt: int, + max_retries: int, +) -> None: + """Log the outcome of a dispatch attempt. + + Args: + title: Task title for logging context. + result: The dispatch result. + attempt: Current attempt number (0-indexed). + max_retries: Maximum retry attempts allowed. + """ + if result.success: + return + + if attempt > 0: + logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60]) + + logger.warning( + "Dispatch attempt %d failed for task %r: %s", + attempt + 1, + title[:60], + result.error, + ) + + +async def dispatch_task( + title: str, + description: str = "", + acceptance_criteria: list[str] | None = None, + task_type: TaskType | None = None, + agent: AgentType | None = None, + issue_number: int | None = None, + api_endpoint: str | None = None, + max_retries: int = 1, +) -> DispatchResult: + """Route a task to the best available agent. + + This is the primary entry point. Callers can either specify the + *agent* and *task_type* explicitly or let the dispatcher infer them + from the *title* and *description*. + + Args: + title: Short human-readable task title. + description: Full task description with context. + acceptance_criteria: List of acceptance criteria strings. + task_type: Override automatic task type inference. + agent: Override automatic agent selection. + issue_number: Gitea issue number to log the assignment on. + api_endpoint: Override API endpoint for AGENT_API dispatches. + max_retries: Number of retry attempts on failure (default 1). + + Returns: + :class:`DispatchResult` describing the final dispatch outcome. + + Example:: + + result = await dispatch_task( + issue_number=1072, + title="Build the cascade LLM router", + description="We need automatic failover...", + acceptance_criteria=["Circuit breaker works", "Metrics exposed"], + ) + if result.success: + print(f"Assigned to {result.agent.value}") + """ + # 1. Validate + validation_error = _validate_task(title, task_type, agent, issue_number) + if validation_error: + return validation_error + + # 2. Resolve task type and agent + criteria = acceptance_criteria or [] + resolved_type = task_type or infer_task_type(title, description) + resolved_agent = agent or select_agent(resolved_type) + + logger.info( + "Dispatching task %r → %s (type=%s, issue=#%s)", + title[:60], + resolved_agent.value, + resolved_type.value, + issue_number, + ) + + # 3. Select strategy and dispatch with retries + strategy = _select_dispatch_strategy(resolved_agent, issue_number) + last_result: DispatchResult | None = None + + for attempt in range(max_retries + 1): + if strategy == "gitea": + result = await _dispatch_via_gitea( + resolved_agent, issue_number, title, description, criteria + ) + elif strategy == "api": + result = await _dispatch_via_api( + resolved_agent, title, description, criteria, issue_number, api_endpoint + ) + else: + result = await _dispatch_local(title, description, criteria, issue_number) + + result.retry_count = attempt + last_result = result + + if result.success: + return result + + _log_dispatch_result(title, result, attempt, max_retries) + + # 4. All attempts exhausted — escalate + assert last_result is not None + last_result.status = DispatchStatus.ESCALATED + logger.error( + "Task %r escalated after %d failed attempt(s): %s", + title[:60], + max_retries + 1, + last_result.error, + ) + + # Try to log the escalation on the issue + if issue_number is not None: + await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error") + + return last_result diff --git a/src/timmy/dispatch/queue.py b/src/timmy/dispatch/queue.py new file mode 100644 index 00000000..01074b3b --- /dev/null +++ b/src/timmy/dispatch/queue.py @@ -0,0 +1,198 @@ +"""Gitea polling and comment helpers for task dispatch. + +Provides low-level helpers that interact with the Gitea API to post +comments, apply labels, poll for issue completion, and log escalations. +""" + +from __future__ import annotations + +import asyncio +import logging +from typing import Any + +from config import settings + +from .routing import AGENT_REGISTRY, AgentType, DispatchStatus + +logger = logging.getLogger(__name__) + + +async def _post_gitea_comment( + client: Any, + base_url: str, + repo: str, + headers: dict[str, str], + issue_number: int, + body: str, +) -> int | None: + """Post a comment on a Gitea issue and return the comment ID.""" + try: + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue_number}/comments", + headers=headers, + json={"body": body}, + ) + if resp.status_code in (200, 201): + return resp.json().get("id") + logger.warning( + "Comment on #%s returned %s: %s", + issue_number, + resp.status_code, + resp.text[:200], + ) + except Exception as exc: + logger.warning("Failed to post comment on #%s: %s", issue_number, exc) + return None + + +async def _apply_gitea_label( + client: Any, + base_url: str, + repo: str, + headers: dict[str, str], + issue_number: int, + label_name: str, + label_color: str = "#0075ca", +) -> bool: + """Ensure *label_name* exists and apply it to an issue. + + Returns True if the label was successfully applied. + """ + # Resolve or create the label + label_id: int | None = None + try: + resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers) + if resp.status_code == 200: + for lbl in resp.json(): + if lbl.get("name") == label_name: + label_id = lbl["id"] + break + except Exception as exc: + logger.warning("Failed to list labels: %s", exc) + return False + + if label_id is None: + try: + resp = await client.post( + f"{base_url}/repos/{repo}/labels", + headers=headers, + json={"name": label_name, "color": label_color}, + ) + if resp.status_code in (200, 201): + label_id = resp.json().get("id") + except Exception as exc: + logger.warning("Failed to create label %r: %s", label_name, exc) + return False + + if label_id is None: + return False + + # Apply label to the issue + try: + resp = await client.post( + f"{base_url}/repos/{repo}/issues/{issue_number}/labels", + headers=headers, + json={"labels": [label_id]}, + ) + return resp.status_code in (200, 201) + except Exception as exc: + logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc) + return False + + +async def _poll_issue_completion( + issue_number: int, + poll_interval: int = 60, + max_wait: int = 7200, +) -> DispatchStatus: + """Poll a Gitea issue until closed (completed) or timeout. + + Args: + issue_number: Gitea issue to watch. + poll_interval: Seconds between polls. + max_wait: Maximum total seconds to wait. + + Returns: + :attr:`DispatchStatus.COMPLETED` if the issue was closed, + :attr:`DispatchStatus.TIMED_OUT` otherwise. + """ + try: + import httpx + except ImportError as exc: + logger.warning("poll_issue_completion: missing dependency: %s", exc) + return DispatchStatus.FAILED + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = {"Authorization": f"token {settings.gitea_token}"} + issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}" + + elapsed = 0 + while elapsed < max_wait: + try: + async with httpx.AsyncClient(timeout=10) as client: + resp = await client.get(issue_url, headers=headers) + if resp.status_code == 200 and resp.json().get("state") == "closed": + logger.info("Issue #%s closed — task completed", issue_number) + return DispatchStatus.COMPLETED + except Exception as exc: + logger.warning("Poll error for issue #%s: %s", issue_number, exc) + + await asyncio.sleep(poll_interval) + elapsed += poll_interval + + logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait) + return DispatchStatus.TIMED_OUT + + +async def _log_escalation( + issue_number: int, + agent: AgentType, + error: str, +) -> None: + """Post an escalation notice on the Gitea issue.""" + try: + import httpx + + if not settings.gitea_enabled or not settings.gitea_token: + return + + base_url = f"{settings.gitea_url}/api/v1" + repo = settings.gitea_repo + headers = { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + } + body = ( + f"## Dispatch Escalated\n\n" + f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** " + f"after {1} attempt(s).\n\n" + f"**Error:** {error}\n\n" + f"Manual intervention required.\n\n" + f"---\n*Timmy agent dispatcher.*" + ) + async with httpx.AsyncClient(timeout=10) as client: + await _post_gitea_comment(client, base_url, repo, headers, issue_number, body) + except Exception as exc: + logger.warning("Failed to post escalation comment: %s", exc) + + +async def wait_for_completion( + issue_number: int, + poll_interval: int = 60, + max_wait: int = 7200, +) -> DispatchStatus: + """Block until the assigned Gitea issue is closed or the timeout fires. + + Useful for synchronous orchestration where the caller wants to wait for + the assigned agent to finish before proceeding. + + Args: + issue_number: Gitea issue to monitor. + poll_interval: Seconds between status polls. + max_wait: Maximum wait in seconds (default 2 hours). + + Returns: + :attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`. + """ + return await _poll_issue_completion(issue_number, poll_interval, max_wait) diff --git a/src/timmy/dispatch/routing.py b/src/timmy/dispatch/routing.py new file mode 100644 index 00000000..4f690dba --- /dev/null +++ b/src/timmy/dispatch/routing.py @@ -0,0 +1,230 @@ +"""Routing logic — enums, agent registry, and task-to-agent mapping. + +Defines the core types (:class:`AgentType`, :class:`TaskType`, +:class:`DispatchStatus`), the :data:`AGENT_REGISTRY`, and the functions +that decide which agent handles a given task. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass +from enum import StrEnum + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# Enumerations +# --------------------------------------------------------------------------- + + +class AgentType(StrEnum): + """Known agents in the swarm.""" + + CLAUDE_CODE = "claude_code" + KIMI_CODE = "kimi_code" + AGENT_API = "agent_api" + TIMMY = "timmy" + + +class TaskType(StrEnum): + """Categories of engineering work.""" + + # Claude Code strengths + ARCHITECTURE = "architecture" + REFACTORING = "refactoring" + COMPLEX_REASONING = "complex_reasoning" + CODE_REVIEW = "code_review" + + # Kimi Code strengths + PARALLEL_IMPLEMENTATION = "parallel_implementation" + ROUTINE_CODING = "routine_coding" + FAST_ITERATION = "fast_iteration" + + # Agent API strengths + RESEARCH = "research" + ANALYSIS = "analysis" + SPECIALIZED = "specialized" + + # Timmy strengths + TRIAGE = "triage" + PLANNING = "planning" + CREATIVE = "creative" + ORCHESTRATION = "orchestration" + + +class DispatchStatus(StrEnum): + """Lifecycle state of a dispatched task.""" + + PENDING = "pending" + ASSIGNED = "assigned" + IN_PROGRESS = "in_progress" + COMPLETED = "completed" + FAILED = "failed" + ESCALATED = "escalated" + TIMED_OUT = "timed_out" + + +# --------------------------------------------------------------------------- +# Agent registry +# --------------------------------------------------------------------------- + + +@dataclass +class AgentSpec: + """Capabilities and limits for a single agent.""" + + name: AgentType + display_name: str + strengths: frozenset[TaskType] + gitea_label: str | None # label to apply when dispatching + max_concurrent: int = 1 + interface: str = "gitea" # "gitea" | "api" | "local" + api_endpoint: str | None = None # for interface="api" + + +#: Authoritative agent registry — all known agents and their capabilities. +AGENT_REGISTRY: dict[AgentType, AgentSpec] = { + AgentType.CLAUDE_CODE: AgentSpec( + name=AgentType.CLAUDE_CODE, + display_name="Claude Code", + strengths=frozenset( + { + TaskType.ARCHITECTURE, + TaskType.REFACTORING, + TaskType.COMPLEX_REASONING, + TaskType.CODE_REVIEW, + } + ), + gitea_label="claude-ready", + max_concurrent=1, + interface="gitea", + ), + AgentType.KIMI_CODE: AgentSpec( + name=AgentType.KIMI_CODE, + display_name="Kimi Code", + strengths=frozenset( + { + TaskType.PARALLEL_IMPLEMENTATION, + TaskType.ROUTINE_CODING, + TaskType.FAST_ITERATION, + } + ), + gitea_label="kimi-ready", + max_concurrent=1, + interface="gitea", + ), + AgentType.AGENT_API: AgentSpec( + name=AgentType.AGENT_API, + display_name="Agent API", + strengths=frozenset( + { + TaskType.RESEARCH, + TaskType.ANALYSIS, + TaskType.SPECIALIZED, + } + ), + gitea_label=None, + max_concurrent=5, + interface="api", + ), + AgentType.TIMMY: AgentSpec( + name=AgentType.TIMMY, + display_name="Timmy", + strengths=frozenset( + { + TaskType.TRIAGE, + TaskType.PLANNING, + TaskType.CREATIVE, + TaskType.ORCHESTRATION, + } + ), + gitea_label=None, + max_concurrent=1, + interface="local", + ), +} + +#: Map from task type to preferred agent (primary routing table). +_TASK_ROUTING: dict[TaskType, AgentType] = { + TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE, + TaskType.REFACTORING: AgentType.CLAUDE_CODE, + TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE, + TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE, + TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE, + TaskType.ROUTINE_CODING: AgentType.KIMI_CODE, + TaskType.FAST_ITERATION: AgentType.KIMI_CODE, + TaskType.RESEARCH: AgentType.AGENT_API, + TaskType.ANALYSIS: AgentType.AGENT_API, + TaskType.SPECIALIZED: AgentType.AGENT_API, + TaskType.TRIAGE: AgentType.TIMMY, + TaskType.PLANNING: AgentType.TIMMY, + TaskType.CREATIVE: AgentType.TIMMY, + TaskType.ORCHESTRATION: AgentType.TIMMY, +} + + +# --------------------------------------------------------------------------- +# Routing logic +# --------------------------------------------------------------------------- + + +def select_agent(task_type: TaskType) -> AgentType: + """Return the best agent for *task_type* based on the routing table. + + Args: + task_type: The category of engineering work to be done. + + Returns: + The :class:`AgentType` best suited to handle this task. + """ + return _TASK_ROUTING.get(task_type, AgentType.TIMMY) + + +def infer_task_type(title: str, description: str = "") -> TaskType: + """Heuristic: guess the most appropriate :class:`TaskType` from text. + + Scans *title* and *description* for keyword signals and returns the + strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`. + + Args: + title: Short task title. + description: Longer task description (optional). + + Returns: + The inferred :class:`TaskType`. + """ + text = (title + " " + description).lower() + + _SIGNALS: list[tuple[TaskType, frozenset[str]]] = [ + ( + TaskType.ARCHITECTURE, + frozenset({"architect", "design", "adr", "system design", "schema"}), + ), + ( + TaskType.REFACTORING, + frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"}), + ), + (TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})), + ( + TaskType.COMPLEX_REASONING, + frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"}), + ), + ( + TaskType.RESEARCH, + frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"}), + ), + (TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})), + (TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})), + (TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})), + (TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})), + (TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})), + (TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})), + (TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})), + ] + + for task_type, keywords in _SIGNALS: + if any(kw in text for kw in keywords): + return task_type + + return TaskType.ROUTINE_CODING diff --git a/src/timmy/dispatcher.py b/src/timmy/dispatcher.py index b2f6edd9..99da0d62 100644 --- a/src/timmy/dispatcher.py +++ b/src/timmy/dispatcher.py @@ -30,888 +30,12 @@ Usage:: description="We need a cascade router...", acceptance_criteria=["Failover works", "Metrics exposed"], ) + +.. note:: + + This module is a backward-compatibility shim. The implementation now + lives in :mod:`timmy.dispatch`. All public *and* private names that + tests rely on are re-exported here. """ -from __future__ import annotations - -import asyncio -import logging -from dataclasses import dataclass, field -from enum import StrEnum -from typing import Any - -from config import settings - -logger = logging.getLogger(__name__) - -# --------------------------------------------------------------------------- -# Enumerations -# --------------------------------------------------------------------------- - - -class AgentType(StrEnum): - """Known agents in the swarm.""" - - CLAUDE_CODE = "claude_code" - KIMI_CODE = "kimi_code" - AGENT_API = "agent_api" - TIMMY = "timmy" - - -class TaskType(StrEnum): - """Categories of engineering work.""" - - # Claude Code strengths - ARCHITECTURE = "architecture" - REFACTORING = "refactoring" - COMPLEX_REASONING = "complex_reasoning" - CODE_REVIEW = "code_review" - - # Kimi Code strengths - PARALLEL_IMPLEMENTATION = "parallel_implementation" - ROUTINE_CODING = "routine_coding" - FAST_ITERATION = "fast_iteration" - - # Agent API strengths - RESEARCH = "research" - ANALYSIS = "analysis" - SPECIALIZED = "specialized" - - # Timmy strengths - TRIAGE = "triage" - PLANNING = "planning" - CREATIVE = "creative" - ORCHESTRATION = "orchestration" - - -class DispatchStatus(StrEnum): - """Lifecycle state of a dispatched task.""" - - PENDING = "pending" - ASSIGNED = "assigned" - IN_PROGRESS = "in_progress" - COMPLETED = "completed" - FAILED = "failed" - ESCALATED = "escalated" - TIMED_OUT = "timed_out" - - -# --------------------------------------------------------------------------- -# Agent registry -# --------------------------------------------------------------------------- - - -@dataclass -class AgentSpec: - """Capabilities and limits for a single agent.""" - - name: AgentType - display_name: str - strengths: frozenset[TaskType] - gitea_label: str | None # label to apply when dispatching - max_concurrent: int = 1 - interface: str = "gitea" # "gitea" | "api" | "local" - api_endpoint: str | None = None # for interface="api" - - -#: Authoritative agent registry — all known agents and their capabilities. -AGENT_REGISTRY: dict[AgentType, AgentSpec] = { - AgentType.CLAUDE_CODE: AgentSpec( - name=AgentType.CLAUDE_CODE, - display_name="Claude Code", - strengths=frozenset( - { - TaskType.ARCHITECTURE, - TaskType.REFACTORING, - TaskType.COMPLEX_REASONING, - TaskType.CODE_REVIEW, - } - ), - gitea_label="claude-ready", - max_concurrent=1, - interface="gitea", - ), - AgentType.KIMI_CODE: AgentSpec( - name=AgentType.KIMI_CODE, - display_name="Kimi Code", - strengths=frozenset( - { - TaskType.PARALLEL_IMPLEMENTATION, - TaskType.ROUTINE_CODING, - TaskType.FAST_ITERATION, - } - ), - gitea_label="kimi-ready", - max_concurrent=1, - interface="gitea", - ), - AgentType.AGENT_API: AgentSpec( - name=AgentType.AGENT_API, - display_name="Agent API", - strengths=frozenset( - { - TaskType.RESEARCH, - TaskType.ANALYSIS, - TaskType.SPECIALIZED, - } - ), - gitea_label=None, - max_concurrent=5, - interface="api", - ), - AgentType.TIMMY: AgentSpec( - name=AgentType.TIMMY, - display_name="Timmy", - strengths=frozenset( - { - TaskType.TRIAGE, - TaskType.PLANNING, - TaskType.CREATIVE, - TaskType.ORCHESTRATION, - } - ), - gitea_label=None, - max_concurrent=1, - interface="local", - ), -} - -#: Map from task type to preferred agent (primary routing table). -_TASK_ROUTING: dict[TaskType, AgentType] = { - TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE, - TaskType.REFACTORING: AgentType.CLAUDE_CODE, - TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE, - TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE, - TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE, - TaskType.ROUTINE_CODING: AgentType.KIMI_CODE, - TaskType.FAST_ITERATION: AgentType.KIMI_CODE, - TaskType.RESEARCH: AgentType.AGENT_API, - TaskType.ANALYSIS: AgentType.AGENT_API, - TaskType.SPECIALIZED: AgentType.AGENT_API, - TaskType.TRIAGE: AgentType.TIMMY, - TaskType.PLANNING: AgentType.TIMMY, - TaskType.CREATIVE: AgentType.TIMMY, - TaskType.ORCHESTRATION: AgentType.TIMMY, -} - - -# --------------------------------------------------------------------------- -# Dispatch result -# --------------------------------------------------------------------------- - - -@dataclass -class DispatchResult: - """Outcome of a dispatch call.""" - - task_type: TaskType - agent: AgentType - issue_number: int | None - status: DispatchStatus - comment_id: int | None = None - label_applied: str | None = None - error: str | None = None - retry_count: int = 0 - metadata: dict[str, Any] = field(default_factory=dict) - - @property - def success(self) -> bool: # noqa: D401 - return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED) - - -# --------------------------------------------------------------------------- -# Routing logic -# --------------------------------------------------------------------------- - - -def select_agent(task_type: TaskType) -> AgentType: - """Return the best agent for *task_type* based on the routing table. - - Args: - task_type: The category of engineering work to be done. - - Returns: - The :class:`AgentType` best suited to handle this task. - """ - return _TASK_ROUTING.get(task_type, AgentType.TIMMY) - - -def infer_task_type(title: str, description: str = "") -> TaskType: - """Heuristic: guess the most appropriate :class:`TaskType` from text. - - Scans *title* and *description* for keyword signals and returns the - strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`. - - Args: - title: Short task title. - description: Longer task description (optional). - - Returns: - The inferred :class:`TaskType`. - """ - text = (title + " " + description).lower() - - _SIGNALS: list[tuple[TaskType, frozenset[str]]] = [ - ( - TaskType.ARCHITECTURE, - frozenset({"architect", "design", "adr", "system design", "schema"}), - ), - ( - TaskType.REFACTORING, - frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"}), - ), - (TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})), - ( - TaskType.COMPLEX_REASONING, - frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"}), - ), - ( - TaskType.RESEARCH, - frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"}), - ), - (TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})), - (TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})), - (TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})), - (TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})), - (TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})), - (TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})), - (TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})), - ] - - for task_type, keywords in _SIGNALS: - if any(kw in text for kw in keywords): - return task_type - - return TaskType.ROUTINE_CODING - - -# --------------------------------------------------------------------------- -# Gitea helpers -# --------------------------------------------------------------------------- - - -async def _post_gitea_comment( - client: Any, - base_url: str, - repo: str, - headers: dict[str, str], - issue_number: int, - body: str, -) -> int | None: - """Post a comment on a Gitea issue and return the comment ID.""" - try: - resp = await client.post( - f"{base_url}/repos/{repo}/issues/{issue_number}/comments", - headers=headers, - json={"body": body}, - ) - if resp.status_code in (200, 201): - return resp.json().get("id") - logger.warning( - "Comment on #%s returned %s: %s", - issue_number, - resp.status_code, - resp.text[:200], - ) - except Exception as exc: - logger.warning("Failed to post comment on #%s: %s", issue_number, exc) - return None - - -async def _apply_gitea_label( - client: Any, - base_url: str, - repo: str, - headers: dict[str, str], - issue_number: int, - label_name: str, - label_color: str = "#0075ca", -) -> bool: - """Ensure *label_name* exists and apply it to an issue. - - Returns True if the label was successfully applied. - """ - # Resolve or create the label - label_id: int | None = None - try: - resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers) - if resp.status_code == 200: - for lbl in resp.json(): - if lbl.get("name") == label_name: - label_id = lbl["id"] - break - except Exception as exc: - logger.warning("Failed to list labels: %s", exc) - return False - - if label_id is None: - try: - resp = await client.post( - f"{base_url}/repos/{repo}/labels", - headers=headers, - json={"name": label_name, "color": label_color}, - ) - if resp.status_code in (200, 201): - label_id = resp.json().get("id") - except Exception as exc: - logger.warning("Failed to create label %r: %s", label_name, exc) - return False - - if label_id is None: - return False - - # Apply label to the issue - try: - resp = await client.post( - f"{base_url}/repos/{repo}/issues/{issue_number}/labels", - headers=headers, - json={"labels": [label_id]}, - ) - return resp.status_code in (200, 201) - except Exception as exc: - logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc) - return False - - -async def _poll_issue_completion( - issue_number: int, - poll_interval: int = 60, - max_wait: int = 7200, -) -> DispatchStatus: - """Poll a Gitea issue until closed (completed) or timeout. - - Args: - issue_number: Gitea issue to watch. - poll_interval: Seconds between polls. - max_wait: Maximum total seconds to wait. - - Returns: - :attr:`DispatchStatus.COMPLETED` if the issue was closed, - :attr:`DispatchStatus.TIMED_OUT` otherwise. - """ - try: - import httpx - except ImportError as exc: - logger.warning("poll_issue_completion: missing dependency: %s", exc) - return DispatchStatus.FAILED - - base_url = f"{settings.gitea_url}/api/v1" - repo = settings.gitea_repo - headers = {"Authorization": f"token {settings.gitea_token}"} - issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}" - - elapsed = 0 - while elapsed < max_wait: - try: - async with httpx.AsyncClient(timeout=10) as client: - resp = await client.get(issue_url, headers=headers) - if resp.status_code == 200 and resp.json().get("state") == "closed": - logger.info("Issue #%s closed — task completed", issue_number) - return DispatchStatus.COMPLETED - except Exception as exc: - logger.warning("Poll error for issue #%s: %s", issue_number, exc) - - await asyncio.sleep(poll_interval) - elapsed += poll_interval - - logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait) - return DispatchStatus.TIMED_OUT - - -# --------------------------------------------------------------------------- -# Core dispatch functions -# --------------------------------------------------------------------------- - - -def _format_assignment_comment( - display_name: str, - task_type: TaskType, - description: str, - acceptance_criteria: list[str], -) -> str: - """Build the markdown comment body for a task assignment. - - Args: - display_name: Human-readable agent name. - task_type: The inferred task type. - description: Task description. - acceptance_criteria: List of acceptance criteria strings. - - Returns: - Formatted markdown string for the comment. - """ - criteria_md = ( - "\n".join(f"- {c}" for c in acceptance_criteria) - if acceptance_criteria - else "_None specified_" - ) - return ( - f"## Assigned to {display_name}\n\n" - f"**Task type:** `{task_type.value}`\n\n" - f"**Description:**\n{description}\n\n" - f"**Acceptance criteria:**\n{criteria_md}\n\n" - f"---\n*Dispatched by Timmy agent dispatcher.*" - ) - - -def _select_label(agent: AgentType) -> str | None: - """Return the Gitea label for an agent based on its spec. - - Args: - agent: The target agent. - - Returns: - Label name or None if the agent has no label. - """ - return AGENT_REGISTRY[agent].gitea_label - - -async def _dispatch_via_gitea( - agent: AgentType, - issue_number: int, - title: str, - description: str, - acceptance_criteria: list[str], -) -> DispatchResult: - """Assign a task by applying a Gitea label and posting an assignment comment. - - Args: - agent: Target agent. - issue_number: Gitea issue to assign. - title: Short task title. - description: Full task description. - acceptance_criteria: List of acceptance criteria strings. - - Returns: - :class:`DispatchResult` describing the outcome. - """ - try: - import httpx - except ImportError as exc: - return DispatchResult( - task_type=TaskType.ROUTINE_CODING, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error=f"Missing dependency: {exc}", - ) - - spec = AGENT_REGISTRY[agent] - task_type = infer_task_type(title, description) - - if not settings.gitea_enabled or not settings.gitea_token: - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error="Gitea integration not configured (no token or disabled).", - ) - - base_url = f"{settings.gitea_url}/api/v1" - repo = settings.gitea_repo - headers = { - "Authorization": f"token {settings.gitea_token}", - "Content-Type": "application/json", - } - - comment_id: int | None = None - label_applied: str | None = None - - async with httpx.AsyncClient(timeout=15) as client: - # 1. Apply agent label (if applicable) - label = _select_label(agent) - if label: - ok = await _apply_gitea_label(client, base_url, repo, headers, issue_number, label) - if ok: - label_applied = label - logger.info( - "Applied label %r to issue #%s for %s", - label, - issue_number, - spec.display_name, - ) - else: - logger.warning( - "Could not apply label %r to issue #%s", - label, - issue_number, - ) - - # 2. Post assignment comment - comment_body = _format_assignment_comment( - spec.display_name, task_type, description, acceptance_criteria - ) - comment_id = await _post_gitea_comment( - client, base_url, repo, headers, issue_number, comment_body - ) - - if comment_id is not None or label_applied is not None: - logger.info( - "Dispatched issue #%s to %s (label=%r, comment=%s)", - issue_number, - spec.display_name, - label_applied, - comment_id, - ) - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.ASSIGNED, - comment_id=comment_id, - label_applied=label_applied, - ) - - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error="Failed to apply label and post comment — check Gitea connectivity.", - ) - - -async def _dispatch_via_api( - agent: AgentType, - title: str, - description: str, - acceptance_criteria: list[str], - issue_number: int | None = None, - endpoint: str | None = None, -) -> DispatchResult: - """Dispatch a task to an external HTTP API agent. - - Args: - agent: Target agent. - title: Short task title. - description: Task description. - acceptance_criteria: List of acceptance criteria. - issue_number: Optional Gitea issue for cross-referencing. - endpoint: Override API endpoint URL (uses spec default if omitted). - - Returns: - :class:`DispatchResult` describing the outcome. - """ - spec = AGENT_REGISTRY[agent] - task_type = infer_task_type(title, description) - url = endpoint or spec.api_endpoint - - if not url: - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error=f"No API endpoint configured for agent {agent.value}.", - ) - - payload = { - "title": title, - "description": description, - "acceptance_criteria": acceptance_criteria, - "issue_number": issue_number, - "agent": agent.value, - "task_type": task_type.value, - } - - try: - import httpx - - async with httpx.AsyncClient(timeout=30) as client: - resp = await client.post(url, json=payload) - - if resp.status_code in (200, 201, 202): - logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url) - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.ASSIGNED, - metadata={"response": resp.json() if resp.content else {}}, - ) - - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error=f"API agent returned {resp.status_code}: {resp.text[:200]}", - ) - except Exception as exc: - logger.warning("API dispatch to %s failed: %s", url, exc) - return DispatchResult( - task_type=task_type, - agent=agent, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error=str(exc), - ) - - -async def _dispatch_local( - title: str, - description: str = "", - acceptance_criteria: list[str] | None = None, - issue_number: int | None = None, -) -> DispatchResult: - """Handle a task locally — Timmy processes it directly. - - This is a lightweight stub. Real local execution should be wired - into the agentic loop or a dedicated Timmy tool. - - Args: - title: Short task title. - description: Task description. - acceptance_criteria: Acceptance criteria list. - issue_number: Optional Gitea issue number for logging. - - Returns: - :class:`DispatchResult` with ASSIGNED status (local execution is - assumed to succeed at dispatch time). - """ - task_type = infer_task_type(title, description) - logger.info("Timmy handling task locally: %r (issue #%s)", title[:60], issue_number) - return DispatchResult( - task_type=task_type, - agent=AgentType.TIMMY, - issue_number=issue_number, - status=DispatchStatus.ASSIGNED, - metadata={"local": True, "description": description}, - ) - - -# --------------------------------------------------------------------------- -# Public entry point -# --------------------------------------------------------------------------- - - -def _validate_task( - title: str, - task_type: TaskType | None, - agent: AgentType | None, - issue_number: int | None, -) -> DispatchResult | None: - """Validate task preconditions. - - Args: - title: Task title to validate. - task_type: Optional task type for result construction. - agent: Optional agent for result construction. - issue_number: Optional issue number for result construction. - - Returns: - A failed DispatchResult if validation fails, None otherwise. - """ - if not title.strip(): - return DispatchResult( - task_type=task_type or TaskType.ROUTINE_CODING, - agent=agent or AgentType.TIMMY, - issue_number=issue_number, - status=DispatchStatus.FAILED, - error="`title` is required.", - ) - return None - - -def _select_dispatch_strategy(agent: AgentType, issue_number: int | None) -> str: - """Select the dispatch strategy based on agent interface and context. - - Args: - agent: The target agent. - issue_number: Optional Gitea issue number. - - Returns: - Strategy name: "gitea", "api", or "local". - """ - spec = AGENT_REGISTRY[agent] - if spec.interface == "gitea" and issue_number is not None: - return "gitea" - if spec.interface == "api": - return "api" - return "local" - - -def _log_dispatch_result( - title: str, - result: DispatchResult, - attempt: int, - max_retries: int, -) -> None: - """Log the outcome of a dispatch attempt. - - Args: - title: Task title for logging context. - result: The dispatch result. - attempt: Current attempt number (0-indexed). - max_retries: Maximum retry attempts allowed. - """ - if result.success: - return - - if attempt > 0: - logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60]) - - logger.warning( - "Dispatch attempt %d failed for task %r: %s", - attempt + 1, - title[:60], - result.error, - ) - - -async def dispatch_task( - title: str, - description: str = "", - acceptance_criteria: list[str] | None = None, - task_type: TaskType | None = None, - agent: AgentType | None = None, - issue_number: int | None = None, - api_endpoint: str | None = None, - max_retries: int = 1, -) -> DispatchResult: - """Route a task to the best available agent. - - This is the primary entry point. Callers can either specify the - *agent* and *task_type* explicitly or let the dispatcher infer them - from the *title* and *description*. - - Args: - title: Short human-readable task title. - description: Full task description with context. - acceptance_criteria: List of acceptance criteria strings. - task_type: Override automatic task type inference. - agent: Override automatic agent selection. - issue_number: Gitea issue number to log the assignment on. - api_endpoint: Override API endpoint for AGENT_API dispatches. - max_retries: Number of retry attempts on failure (default 1). - - Returns: - :class:`DispatchResult` describing the final dispatch outcome. - - Example:: - - result = await dispatch_task( - issue_number=1072, - title="Build the cascade LLM router", - description="We need automatic failover...", - acceptance_criteria=["Circuit breaker works", "Metrics exposed"], - ) - if result.success: - print(f"Assigned to {result.agent.value}") - """ - # 1. Validate - validation_error = _validate_task(title, task_type, agent, issue_number) - if validation_error: - return validation_error - - # 2. Resolve task type and agent - criteria = acceptance_criteria or [] - resolved_type = task_type or infer_task_type(title, description) - resolved_agent = agent or select_agent(resolved_type) - - logger.info( - "Dispatching task %r → %s (type=%s, issue=#%s)", - title[:60], - resolved_agent.value, - resolved_type.value, - issue_number, - ) - - # 3. Select strategy and dispatch with retries - strategy = _select_dispatch_strategy(resolved_agent, issue_number) - last_result: DispatchResult | None = None - - for attempt in range(max_retries + 1): - if strategy == "gitea": - result = await _dispatch_via_gitea( - resolved_agent, issue_number, title, description, criteria - ) - elif strategy == "api": - result = await _dispatch_via_api( - resolved_agent, title, description, criteria, issue_number, api_endpoint - ) - else: - result = await _dispatch_local(title, description, criteria, issue_number) - - result.retry_count = attempt - last_result = result - - if result.success: - return result - - _log_dispatch_result(title, result, attempt, max_retries) - - # 4. All attempts exhausted — escalate - assert last_result is not None - last_result.status = DispatchStatus.ESCALATED - logger.error( - "Task %r escalated after %d failed attempt(s): %s", - title[:60], - max_retries + 1, - last_result.error, - ) - - # Try to log the escalation on the issue - if issue_number is not None: - await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error") - - return last_result - - -async def _log_escalation( - issue_number: int, - agent: AgentType, - error: str, -) -> None: - """Post an escalation notice on the Gitea issue.""" - try: - import httpx - - if not settings.gitea_enabled or not settings.gitea_token: - return - - base_url = f"{settings.gitea_url}/api/v1" - repo = settings.gitea_repo - headers = { - "Authorization": f"token {settings.gitea_token}", - "Content-Type": "application/json", - } - body = ( - f"## Dispatch Escalated\n\n" - f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** " - f"after {1} attempt(s).\n\n" - f"**Error:** {error}\n\n" - f"Manual intervention required.\n\n" - f"---\n*Timmy agent dispatcher.*" - ) - async with httpx.AsyncClient(timeout=10) as client: - await _post_gitea_comment(client, base_url, repo, headers, issue_number, body) - except Exception as exc: - logger.warning("Failed to post escalation comment: %s", exc) - - -# --------------------------------------------------------------------------- -# Monitoring helper -# --------------------------------------------------------------------------- - - -async def wait_for_completion( - issue_number: int, - poll_interval: int = 60, - max_wait: int = 7200, -) -> DispatchStatus: - """Block until the assigned Gitea issue is closed or the timeout fires. - - Useful for synchronous orchestration where the caller wants to wait for - the assigned agent to finish before proceeding. - - Args: - issue_number: Gitea issue to monitor. - poll_interval: Seconds between status polls. - max_wait: Maximum wait in seconds (default 2 hours). - - Returns: - :attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`. - """ - return await _poll_issue_completion(issue_number, poll_interval, max_wait) +from timmy.dispatch import * # noqa: F401, F403