Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
2ffbf9cc96 fix: Enable xdist with coverage in tox.ini Fixes #932 2026-03-23 22:12:26 -04:00
Alexander Whitestone
f0bf778197 WIP: Gemini Code progress on #932
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:34:27 -04:00
10 changed files with 20 additions and 1909 deletions

View File

@@ -25,19 +25,6 @@ providers:
tier: local
url: "http://localhost:11434"
models:
# ── Dual-model routing: Qwen3-8B (fast) + Qwen3-14B (quality) ──────────
# Both models fit simultaneously: ~6.6 GB + ~10.5 GB = ~17 GB combined.
# Requires OLLAMA_MAX_LOADED_MODELS=2 (set in .env) to stay hot.
# Ref: issue #1065 — Qwen3-8B/14B dual-model routing strategy
- name: qwen3:8b
context_window: 32768
capabilities: [text, tools, json, streaming, routine]
description: "Qwen3-8B Q6_K — fast router for routine tasks (~6.6 GB, 45-55 tok/s)"
- name: qwen3:14b
context_window: 40960
capabilities: [text, tools, json, streaming, complex, reasoning]
description: "Qwen3-14B Q5_K_M — complex reasoning and planning (~10.5 GB, 20-28 tok/s)"
# Text + Tools models
- name: qwen3:30b
default: true
@@ -200,20 +187,6 @@ fallback_chains:
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
# ── Complexity-based routing chains (issue #1065) ───────────────────────
# Routine tasks: prefer Qwen3-8B for low latency (~45-55 tok/s)
routine:
- qwen3:8b # Primary fast model
- llama3.1:8b-instruct # Fallback fast model
- llama3.2:3b # Smallest available
# Complex tasks: prefer Qwen3-14B for quality (~20-28 tok/s)
complex:
- qwen3:14b # Primary quality model
- hermes4-14b # Native tool calling, hybrid reasoning
- qwen3:30b # Highest local quality
- qwen2.5:14b # Additional fallback
# ── Custom Models ───────────────────────────────────────────────────────────
# Register custom model weights for per-agent assignment.
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.

View File

@@ -41,13 +41,6 @@ class Settings(BaseSettings):
# 4096 keeps memory at ~19GB. Set to 0 to use model defaults.
ollama_num_ctx: int = 4096
# Maximum models loaded simultaneously in Ollama — override with OLLAMA_MAX_LOADED_MODELS
# Set to 2 so Qwen3-8B and Qwen3-14B can stay hot concurrently (~17 GB combined).
# Requires Ollama ≥ 0.1.33. Export this to the Ollama process environment:
# OLLAMA_MAX_LOADED_MODELS=2 ollama serve
# or add it to your systemd/launchd unit before starting the harness.
ollama_max_loaded_models: int = 2
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:30b,llama3.1"
# Or edit config/providers.yaml → fallback_chains for the canonical source.

View File

@@ -2,7 +2,6 @@
from .api import router
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
from .classifier import TaskComplexity, classify_task
from .history import HealthHistoryStore, get_history_store
__all__ = [
@@ -13,6 +12,4 @@ __all__ = [
"router",
"HealthHistoryStore",
"get_history_store",
"TaskComplexity",
"classify_task",
]

View File

@@ -528,34 +528,6 @@ class CascadeRouter:
return True
def _get_model_for_complexity(
self, provider: Provider, complexity: "TaskComplexity"
) -> str | None:
"""Return the best model on *provider* for the given complexity tier.
Checks fallback chains first (routine / complex), then falls back to
any model with the matching capability tag, then the provider default.
"""
from infrastructure.router.classifier import TaskComplexity
chain_key = "routine" if complexity == TaskComplexity.SIMPLE else "complex"
# Walk the capability fallback chain — first model present on this provider wins
for model_name in self.config.fallback_chains.get(chain_key, []):
if any(m["name"] == model_name for m in provider.models):
return model_name
# Direct capability lookup — only return if a model explicitly has the tag
# (do not use get_model_with_capability here as it falls back to the default)
cap_model = next(
(m["name"] for m in provider.models if chain_key in m.get("capabilities", [])),
None,
)
if cap_model:
return cap_model
return None # Caller will use provider default
async def complete(
self,
messages: list[dict],
@@ -563,7 +535,6 @@ class CascadeRouter:
temperature: float = 0.7,
max_tokens: int | None = None,
cascade_tier: str | None = None,
complexity_hint: str | None = None,
) -> dict:
"""Complete a chat conversation with automatic failover.
@@ -572,48 +543,24 @@ class CascadeRouter:
- Falls back to vision-capable models when needed
- Supports image URLs, paths, and base64 encoding
Complexity-based routing (issue #1065):
- ``complexity_hint="simple"`` → routes to Qwen3-8B (low-latency)
- ``complexity_hint="complex"`` → routes to Qwen3-14B (quality)
- ``complexity_hint=None`` (default) → auto-classifies from messages
Args:
messages: List of message dicts with role and content
model: Preferred model (tries this first; complexity routing is
skipped when an explicit model is given)
model: Preferred model (tries this first, then provider defaults)
temperature: Sampling temperature
max_tokens: Maximum tokens to generate
cascade_tier: If specified, filters providers by this tier.
- "frontier_required": Uses only Anthropic provider for top-tier models.
complexity_hint: "simple", "complex", or None (auto-detect).
Returns:
Dict with content, provider_used, model, latency_ms,
is_fallback_model, and complexity fields.
Dict with content, provider_used, and metrics
Raises:
RuntimeError: If all providers fail
"""
from infrastructure.router.classifier import TaskComplexity, classify_task
content_type = self._detect_content_type(messages)
if content_type != ContentType.TEXT:
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
# Resolve task complexity ─────────────────────────────────────────────
# Skip complexity routing when caller explicitly specifies a model.
complexity: TaskComplexity | None = None
if model is None:
if complexity_hint is not None:
try:
complexity = TaskComplexity(complexity_hint.lower())
except ValueError:
logger.warning("Unknown complexity_hint %r, auto-classifying", complexity_hint)
complexity = classify_task(messages)
else:
complexity = classify_task(messages)
logger.debug("Task complexity: %s", complexity.value)
errors = []
providers = self.providers
@@ -626,6 +573,7 @@ class CascadeRouter:
if not providers:
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
for provider in providers:
if not self._is_provider_available(provider):
continue
@@ -639,21 +587,7 @@ class CascadeRouter:
)
continue
# Complexity-based model selection (only when no explicit model) ──
effective_model = model
if effective_model is None and complexity is not None:
effective_model = self._get_model_for_complexity(provider, complexity)
if effective_model:
logger.debug(
"Complexity routing [%s]: %s%s",
complexity.value,
provider.name,
effective_model,
)
selected_model, is_fallback_model = self._select_model(
provider, effective_model, content_type
)
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
try:
result = await self._attempt_with_retry(
@@ -676,7 +610,6 @@ class CascadeRouter:
"model": result.get("model", selected_model or provider.get_default_model()),
"latency_ms": result.get("latency_ms", 0),
"is_fallback_model": is_fallback_model,
"complexity": complexity.value if complexity is not None else None,
}
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")

View File

@@ -1,166 +0,0 @@
"""Task complexity classifier for Qwen3 dual-model routing.
Classifies incoming tasks as SIMPLE (route to Qwen3-8B for low-latency)
or COMPLEX (route to Qwen3-14B for quality-sensitive work).
Classification is fully heuristic — no LLM inference required.
"""
import re
from enum import Enum
class TaskComplexity(Enum):
"""Task complexity tier for model routing."""
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
COMPLEX = "complex" # Qwen3-14B Q5_K_M: quality-sensitive, multi-step
# Keywords strongly associated with complex tasks
_COMPLEX_KEYWORDS: frozenset[str] = frozenset(
[
"plan",
"review",
"analyze",
"analyse",
"triage",
"refactor",
"design",
"architecture",
"implement",
"compare",
"debug",
"explain",
"prioritize",
"prioritise",
"strategy",
"optimize",
"optimise",
"evaluate",
"assess",
"brainstorm",
"outline",
"summarize",
"summarise",
"generate code",
"write a",
"write the",
"code review",
"pull request",
"multi-step",
"multi step",
"step by step",
"backlog prioriti",
"issue triage",
"root cause",
"how does",
"why does",
"what are the",
]
)
# Keywords strongly associated with simple/routine tasks
_SIMPLE_KEYWORDS: frozenset[str] = frozenset(
[
"status",
"list ",
"show ",
"what is",
"how many",
"ping",
"run ",
"execute ",
"ls ",
"cat ",
"ps ",
"fetch ",
"count ",
"tail ",
"head ",
"grep ",
"find file",
"read file",
"get ",
"query ",
"check ",
"yes",
"no",
"ok",
"done",
"thanks",
]
)
# Content longer than this is treated as complex regardless of keywords
_COMPLEX_CHAR_THRESHOLD = 500
# Short content defaults to simple
_SIMPLE_CHAR_THRESHOLD = 150
# More than this many messages suggests an ongoing complex conversation
_COMPLEX_CONVERSATION_DEPTH = 6
def classify_task(messages: list[dict]) -> TaskComplexity:
"""Classify task complexity from a list of messages.
Uses heuristic rules — no LLM call required. Errs toward COMPLEX
when uncertain so that quality is preserved.
Args:
messages: List of message dicts with ``role`` and ``content`` keys.
Returns:
TaskComplexity.SIMPLE or TaskComplexity.COMPLEX
"""
if not messages:
return TaskComplexity.SIMPLE
# Concatenate all user-turn content for analysis
user_content = " ".join(
msg.get("content", "")
for msg in messages
if msg.get("role") in ("user", "human")
and isinstance(msg.get("content"), str)
).lower().strip()
if not user_content:
return TaskComplexity.SIMPLE
# Complexity signals override everything -----------------------------------
# Explicit complex keywords
for kw in _COMPLEX_KEYWORDS:
if kw in user_content:
return TaskComplexity.COMPLEX
# Numbered / multi-step instruction list: "1. do this 2. do that"
if re.search(r"\b\d+\.\s+\w", user_content):
return TaskComplexity.COMPLEX
# Code blocks embedded in messages
if "```" in user_content:
return TaskComplexity.COMPLEX
# Long content → complex reasoning likely required
if len(user_content) > _COMPLEX_CHAR_THRESHOLD:
return TaskComplexity.COMPLEX
# Deep conversation → complex ongoing task
if len(messages) > _COMPLEX_CONVERSATION_DEPTH:
return TaskComplexity.COMPLEX
# Simplicity signals -------------------------------------------------------
# Explicit simple keywords
for kw in _SIMPLE_KEYWORDS:
if kw in user_content:
return TaskComplexity.SIMPLE
# Short single-sentence messages default to simple
if len(user_content) <= _SIMPLE_CHAR_THRESHOLD:
return TaskComplexity.SIMPLE
# When uncertain, prefer quality (complex model)
return TaskComplexity.COMPLEX

View File

@@ -1,801 +0,0 @@
"""Agent dispatcher — route tasks to Claude Code, Kimi, APIs, or Timmy itself.
Timmy's dispatch system: knows what agents are available, what they're good
at, and how to send them work. Uses Gitea labels and issue comments to assign
tasks and track completion.
Dispatch flow:
1. Match task type to agent strengths
2. Check agent availability (idle or working?)
3. Dispatch task with full context (issue link, requirements, criteria)
4. Log assignment as a Gitea comment
5. Monitor for completion or timeout
6. Review output quality
7. If output fails QA → reassign or escalate
Agent interfaces:
- Claude Code → ``claude-ready`` Gitea label + issue comment
- Kimi Code → ``kimi-ready`` Gitea label + issue comment
- Agent APIs → HTTP POST to external endpoint
- Timmy (self) → direct local invocation
Usage::
from timmy.dispatcher import dispatch_task, TaskType, AgentType
result = await dispatch_task(
issue_number=1072,
task_type=TaskType.ARCHITECTURE,
title="Design the LLM router",
description="We need a cascade router...",
acceptance_criteria=["Failover works", "Metrics exposed"],
)
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class AgentType(str, Enum):
"""Known agents in the swarm."""
CLAUDE_CODE = "claude_code"
KIMI_CODE = "kimi_code"
AGENT_API = "agent_api"
TIMMY = "timmy"
class TaskType(str, Enum):
"""Categories of engineering work."""
# Claude Code strengths
ARCHITECTURE = "architecture"
REFACTORING = "refactoring"
COMPLEX_REASONING = "complex_reasoning"
CODE_REVIEW = "code_review"
# Kimi Code strengths
PARALLEL_IMPLEMENTATION = "parallel_implementation"
ROUTINE_CODING = "routine_coding"
FAST_ITERATION = "fast_iteration"
# Agent API strengths
RESEARCH = "research"
ANALYSIS = "analysis"
SPECIALIZED = "specialized"
# Timmy strengths
TRIAGE = "triage"
PLANNING = "planning"
CREATIVE = "creative"
ORCHESTRATION = "orchestration"
class DispatchStatus(str, Enum):
"""Lifecycle state of a dispatched task."""
PENDING = "pending"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
ESCALATED = "escalated"
TIMED_OUT = "timed_out"
# ---------------------------------------------------------------------------
# Agent registry
# ---------------------------------------------------------------------------
@dataclass
class AgentSpec:
"""Capabilities and limits for a single agent."""
name: AgentType
display_name: str
strengths: frozenset[TaskType]
gitea_label: str | None # label to apply when dispatching
max_concurrent: int = 1
interface: str = "gitea" # "gitea" | "api" | "local"
api_endpoint: str | None = None # for interface="api"
#: Authoritative agent registry — all known agents and their capabilities.
AGENT_REGISTRY: dict[AgentType, AgentSpec] = {
AgentType.CLAUDE_CODE: AgentSpec(
name=AgentType.CLAUDE_CODE,
display_name="Claude Code",
strengths=frozenset(
{
TaskType.ARCHITECTURE,
TaskType.REFACTORING,
TaskType.COMPLEX_REASONING,
TaskType.CODE_REVIEW,
}
),
gitea_label="claude-ready",
max_concurrent=1,
interface="gitea",
),
AgentType.KIMI_CODE: AgentSpec(
name=AgentType.KIMI_CODE,
display_name="Kimi Code",
strengths=frozenset(
{
TaskType.PARALLEL_IMPLEMENTATION,
TaskType.ROUTINE_CODING,
TaskType.FAST_ITERATION,
}
),
gitea_label="kimi-ready",
max_concurrent=1,
interface="gitea",
),
AgentType.AGENT_API: AgentSpec(
name=AgentType.AGENT_API,
display_name="Agent API",
strengths=frozenset(
{
TaskType.RESEARCH,
TaskType.ANALYSIS,
TaskType.SPECIALIZED,
}
),
gitea_label=None,
max_concurrent=5,
interface="api",
),
AgentType.TIMMY: AgentSpec(
name=AgentType.TIMMY,
display_name="Timmy",
strengths=frozenset(
{
TaskType.TRIAGE,
TaskType.PLANNING,
TaskType.CREATIVE,
TaskType.ORCHESTRATION,
}
),
gitea_label=None,
max_concurrent=1,
interface="local",
),
}
#: Map from task type to preferred agent (primary routing table).
_TASK_ROUTING: dict[TaskType, AgentType] = {
TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE,
TaskType.REFACTORING: AgentType.CLAUDE_CODE,
TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE,
TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE,
TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE,
TaskType.ROUTINE_CODING: AgentType.KIMI_CODE,
TaskType.FAST_ITERATION: AgentType.KIMI_CODE,
TaskType.RESEARCH: AgentType.AGENT_API,
TaskType.ANALYSIS: AgentType.AGENT_API,
TaskType.SPECIALIZED: AgentType.AGENT_API,
TaskType.TRIAGE: AgentType.TIMMY,
TaskType.PLANNING: AgentType.TIMMY,
TaskType.CREATIVE: AgentType.TIMMY,
TaskType.ORCHESTRATION: AgentType.TIMMY,
}
# ---------------------------------------------------------------------------
# Dispatch result
# ---------------------------------------------------------------------------
@dataclass
class DispatchResult:
"""Outcome of a dispatch call."""
task_type: TaskType
agent: AgentType
issue_number: int | None
status: DispatchStatus
comment_id: int | None = None
label_applied: str | None = None
error: str | None = None
retry_count: int = 0
metadata: dict[str, Any] = field(default_factory=dict)
@property
def success(self) -> bool: # noqa: D401
return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED)
# ---------------------------------------------------------------------------
# Routing logic
# ---------------------------------------------------------------------------
def select_agent(task_type: TaskType) -> AgentType:
"""Return the best agent for *task_type* based on the routing table.
Args:
task_type: The category of engineering work to be done.
Returns:
The :class:`AgentType` best suited to handle this task.
"""
return _TASK_ROUTING.get(task_type, AgentType.TIMMY)
def infer_task_type(title: str, description: str = "") -> TaskType:
"""Heuristic: guess the most appropriate :class:`TaskType` from text.
Scans *title* and *description* for keyword signals and returns the
strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`.
Args:
title: Short task title.
description: Longer task description (optional).
Returns:
The inferred :class:`TaskType`.
"""
text = (title + " " + description).lower()
_SIGNALS: list[tuple[TaskType, frozenset[str]]] = [
(TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})),
(TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})),
(TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})),
(TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})),
(TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})),
(TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})),
(TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})),
(TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})),
(TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})),
(TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})),
(TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})),
(TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})),
]
for task_type, keywords in _SIGNALS:
if any(kw in text for kw in keywords):
return task_type
return TaskType.ROUTINE_CODING
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
async def _post_gitea_comment(
client: Any,
base_url: str,
repo: str,
headers: dict[str, str],
issue_number: int,
body: str,
) -> int | None:
"""Post a comment on a Gitea issue and return the comment ID."""
try:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
logger.warning(
"Comment on #%s returned %s: %s",
issue_number,
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to post comment on #%s: %s", issue_number, exc)
return None
async def _apply_gitea_label(
client: Any,
base_url: str,
repo: str,
headers: dict[str, str],
issue_number: int,
label_name: str,
label_color: str = "#0075ca",
) -> bool:
"""Ensure *label_name* exists and apply it to an issue.
Returns True if the label was successfully applied.
"""
# Resolve or create the label
label_id: int | None = None
try:
resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers)
if resp.status_code == 200:
for lbl in resp.json():
if lbl.get("name") == label_name:
label_id = lbl["id"]
break
except Exception as exc:
logger.warning("Failed to list labels: %s", exc)
return False
if label_id is None:
try:
resp = await client.post(
f"{base_url}/repos/{repo}/labels",
headers=headers,
json={"name": label_name, "color": label_color},
)
if resp.status_code in (200, 201):
label_id = resp.json().get("id")
except Exception as exc:
logger.warning("Failed to create label %r: %s", label_name, exc)
return False
if label_id is None:
return False
# Apply label to the issue
try:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
headers=headers,
json={"labels": [label_id]},
)
return resp.status_code in (200, 201)
except Exception as exc:
logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc)
return False
async def _poll_issue_completion(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 7200,
) -> DispatchStatus:
"""Poll a Gitea issue until closed (completed) or timeout.
Args:
issue_number: Gitea issue to watch.
poll_interval: Seconds between polls.
max_wait: Maximum total seconds to wait.
Returns:
:attr:`DispatchStatus.COMPLETED` if the issue was closed,
:attr:`DispatchStatus.TIMED_OUT` otherwise.
"""
try:
import httpx
except ImportError as exc:
logger.warning("poll_issue_completion: missing dependency: %s", exc)
return DispatchStatus.FAILED
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
elapsed = 0
while elapsed < max_wait:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(issue_url, headers=headers)
if resp.status_code == 200 and resp.json().get("state") == "closed":
logger.info("Issue #%s closed — task completed", issue_number)
return DispatchStatus.COMPLETED
except Exception as exc:
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait)
return DispatchStatus.TIMED_OUT
# ---------------------------------------------------------------------------
# Core dispatch functions
# ---------------------------------------------------------------------------
async def _dispatch_via_gitea(
agent: AgentType,
issue_number: int,
title: str,
description: str,
acceptance_criteria: list[str],
) -> DispatchResult:
"""Assign a task by applying a Gitea label and posting an assignment comment.
Args:
agent: Target agent.
issue_number: Gitea issue to assign.
title: Short task title.
description: Full task description.
acceptance_criteria: List of acceptance criteria strings.
Returns:
:class:`DispatchResult` describing the outcome.
"""
try:
import httpx
except ImportError as exc:
return DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=f"Missing dependency: {exc}",
)
spec = AGENT_REGISTRY[agent]
task_type = infer_task_type(title, description)
if not settings.gitea_enabled or not settings.gitea_token:
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error="Gitea integration not configured (no token or disabled).",
)
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
comment_id: int | None = None
label_applied: str | None = None
async with httpx.AsyncClient(timeout=15) as client:
# 1. Apply agent label (if applicable)
if spec.gitea_label:
ok = await _apply_gitea_label(
client, base_url, repo, headers, issue_number, spec.gitea_label
)
if ok:
label_applied = spec.gitea_label
logger.info(
"Applied label %r to issue #%s for %s",
spec.gitea_label,
issue_number,
spec.display_name,
)
else:
logger.warning(
"Could not apply label %r to issue #%s",
spec.gitea_label,
issue_number,
)
# 2. Post assignment comment
criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_"
comment_body = (
f"## Assigned to {spec.display_name}\n\n"
f"**Task type:** `{task_type.value}`\n\n"
f"**Description:**\n{description}\n\n"
f"**Acceptance criteria:**\n{criteria_md}\n\n"
f"---\n*Dispatched by Timmy agent dispatcher.*"
)
comment_id = await _post_gitea_comment(
client, base_url, repo, headers, issue_number, comment_body
)
if comment_id is not None or label_applied is not None:
logger.info(
"Dispatched issue #%s to %s (label=%r, comment=%s)",
issue_number,
spec.display_name,
label_applied,
comment_id,
)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.ASSIGNED,
comment_id=comment_id,
label_applied=label_applied,
)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error="Failed to apply label and post comment — check Gitea connectivity.",
)
async def _dispatch_via_api(
agent: AgentType,
title: str,
description: str,
acceptance_criteria: list[str],
issue_number: int | None = None,
endpoint: str | None = None,
) -> DispatchResult:
"""Dispatch a task to an external HTTP API agent.
Args:
agent: Target agent.
title: Short task title.
description: Task description.
acceptance_criteria: List of acceptance criteria.
issue_number: Optional Gitea issue for cross-referencing.
endpoint: Override API endpoint URL (uses spec default if omitted).
Returns:
:class:`DispatchResult` describing the outcome.
"""
spec = AGENT_REGISTRY[agent]
task_type = infer_task_type(title, description)
url = endpoint or spec.api_endpoint
if not url:
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=f"No API endpoint configured for agent {agent.value}.",
)
payload = {
"title": title,
"description": description,
"acceptance_criteria": acceptance_criteria,
"issue_number": issue_number,
"agent": agent.value,
"task_type": task_type.value,
}
try:
import httpx
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, json=payload)
if resp.status_code in (200, 201, 202):
logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.ASSIGNED,
metadata={"response": resp.json() if resp.content else {}},
)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=f"API agent returned {resp.status_code}: {resp.text[:200]}",
)
except Exception as exc:
logger.warning("API dispatch to %s failed: %s", url, exc)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=str(exc),
)
async def _dispatch_local(
title: str,
description: str = "",
acceptance_criteria: list[str] | None = None,
issue_number: int | None = None,
) -> DispatchResult:
"""Handle a task locally — Timmy processes it directly.
This is a lightweight stub. Real local execution should be wired
into the agentic loop or a dedicated Timmy tool.
Args:
title: Short task title.
description: Task description.
acceptance_criteria: Acceptance criteria list.
issue_number: Optional Gitea issue number for logging.
Returns:
:class:`DispatchResult` with ASSIGNED status (local execution is
assumed to succeed at dispatch time).
"""
task_type = infer_task_type(title, description)
logger.info(
"Timmy handling task locally: %r (issue #%s)", title[:60], issue_number
)
return DispatchResult(
task_type=task_type,
agent=AgentType.TIMMY,
issue_number=issue_number,
status=DispatchStatus.ASSIGNED,
metadata={"local": True, "description": description},
)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
async def dispatch_task(
title: str,
description: str = "",
acceptance_criteria: list[str] | None = None,
task_type: TaskType | None = None,
agent: AgentType | None = None,
issue_number: int | None = None,
api_endpoint: str | None = None,
max_retries: int = 1,
) -> DispatchResult:
"""Route a task to the best available agent.
This is the primary entry point. Callers can either specify the
*agent* and *task_type* explicitly or let the dispatcher infer them
from the *title* and *description*.
Args:
title: Short human-readable task title.
description: Full task description with context.
acceptance_criteria: List of acceptance criteria strings.
task_type: Override automatic task type inference.
agent: Override automatic agent selection.
issue_number: Gitea issue number to log the assignment on.
api_endpoint: Override API endpoint for AGENT_API dispatches.
max_retries: Number of retry attempts on failure (default 1).
Returns:
:class:`DispatchResult` describing the final dispatch outcome.
Example::
result = await dispatch_task(
issue_number=1072,
title="Build the cascade LLM router",
description="We need automatic failover...",
acceptance_criteria=["Circuit breaker works", "Metrics exposed"],
)
if result.success:
print(f"Assigned to {result.agent.value}")
"""
criteria = acceptance_criteria or []
if not title.strip():
return DispatchResult(
task_type=task_type or TaskType.ROUTINE_CODING,
agent=agent or AgentType.TIMMY,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error="`title` is required.",
)
resolved_type = task_type or infer_task_type(title, description)
resolved_agent = agent or select_agent(resolved_type)
logger.info(
"Dispatching task %r%s (type=%s, issue=#%s)",
title[:60],
resolved_agent.value,
resolved_type.value,
issue_number,
)
spec = AGENT_REGISTRY[resolved_agent]
last_result: DispatchResult | None = None
for attempt in range(max_retries + 1):
if attempt > 0:
logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60])
if spec.interface == "gitea" and issue_number is not None:
result = await _dispatch_via_gitea(
resolved_agent, issue_number, title, description, criteria
)
elif spec.interface == "api":
result = await _dispatch_via_api(
resolved_agent, title, description, criteria, issue_number, api_endpoint
)
else:
result = await _dispatch_local(title, description, criteria, issue_number)
result.retry_count = attempt
last_result = result
if result.success:
return result
logger.warning(
"Dispatch attempt %d failed for task %r: %s",
attempt + 1,
title[:60],
result.error,
)
# All attempts exhausted — escalate
assert last_result is not None
last_result.status = DispatchStatus.ESCALATED
logger.error(
"Task %r escalated after %d failed attempt(s): %s",
title[:60],
max_retries + 1,
last_result.error,
)
# Try to log the escalation on the issue
if issue_number is not None:
await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error")
return last_result
async def _log_escalation(
issue_number: int,
agent: AgentType,
error: str,
) -> None:
"""Post an escalation notice on the Gitea issue."""
try:
import httpx
if not settings.gitea_enabled or not settings.gitea_token:
return
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"## Dispatch Escalated\n\n"
f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** "
f"after {1} attempt(s).\n\n"
f"**Error:** {error}\n\n"
f"Manual intervention required.\n\n"
f"---\n*Timmy agent dispatcher.*"
)
async with httpx.AsyncClient(timeout=10) as client:
await _post_gitea_comment(
client, base_url, repo, headers, issue_number, body
)
except Exception as exc:
logger.warning("Failed to post escalation comment: %s", exc)
# ---------------------------------------------------------------------------
# Monitoring helper
# ---------------------------------------------------------------------------
async def wait_for_completion(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 7200,
) -> DispatchStatus:
"""Block until the assigned Gitea issue is closed or the timeout fires.
Useful for synchronous orchestration where the caller wants to wait for
the assigned agent to finish before proceeding.
Args:
issue_number: Gitea issue to monitor.
poll_interval: Seconds between status polls.
max_wait: Maximum wait in seconds (default 2 hours).
Returns:
:attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`.
"""
return await _poll_issue_completion(issue_number, poll_interval, max_wait)

View File

@@ -968,195 +968,3 @@ class TestCascadeRouterReload:
assert router.providers[0].name == "low-priority"
assert router.providers[1].name == "high-priority"
class TestComplexityRouting:
"""Tests for Qwen3-8B / Qwen3-14B dual-model routing (issue #1065)."""
def _make_dual_model_provider(self) -> Provider:
"""Build an Ollama provider with both Qwen3 models registered."""
return Provider(
name="ollama-local",
type="ollama",
enabled=True,
priority=1,
url="http://localhost:11434",
models=[
{
"name": "qwen3:8b",
"capabilities": ["text", "tools", "json", "streaming", "routine"],
},
{
"name": "qwen3:14b",
"default": True,
"capabilities": ["text", "tools", "json", "streaming", "complex", "reasoning"],
},
],
)
def test_get_model_for_complexity_simple_returns_8b(self):
"""Simple tasks should select the model with 'routine' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model == "qwen3:8b"
def test_get_model_for_complexity_complex_returns_14b(self):
"""Complex tasks should select the model with 'complex' capability."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
provider = self._make_dual_model_provider()
model = router._get_model_for_complexity(provider, TaskComplexity.COMPLEX)
assert model == "qwen3:14b"
def test_get_model_for_complexity_returns_none_when_no_match(self):
"""Returns None when provider has no matching model in chain."""
from infrastructure.router.classifier import TaskComplexity
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {} # empty chains
provider = Provider(
name="test",
type="ollama",
enabled=True,
priority=1,
models=[{"name": "llama3.2:3b", "default": True, "capabilities": ["text"]}],
)
# No 'routine' or 'complex' model available
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
assert model is None
@pytest.mark.asyncio
async def test_complete_with_simple_hint_routes_to_8b(self):
"""complexity_hint='simple' should use qwen3:8b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "fast answer", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
complexity_hint="simple",
)
assert result["model"] == "qwen3:8b"
assert result["complexity"] == "simple"
@pytest.mark.asyncio
async def test_complete_with_complex_hint_routes_to_14b(self):
"""complexity_hint='complex' should use qwen3:14b."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "detailed answer", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "review this PR"}],
complexity_hint="complex",
)
assert result["model"] == "qwen3:14b"
assert result["complexity"] == "complex"
@pytest.mark.asyncio
async def test_explicit_model_bypasses_complexity_routing(self):
"""When model is explicitly provided, complexity routing is skipped."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "response", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "list tasks"}],
model="qwen3:14b", # explicit override
)
# Explicit model wins — complexity field is None
assert result["model"] == "qwen3:14b"
assert result["complexity"] is None
@pytest.mark.asyncio
async def test_auto_classification_routes_simple_message(self):
"""Short, simple messages should auto-classify as SIMPLE → 8B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
# no complexity_hint — auto-classify
)
assert result["complexity"] == "simple"
assert result["model"] == "qwen3:8b"
@pytest.mark.asyncio
async def test_auto_classification_routes_complex_message(self):
"""Complex messages should auto-classify → 14B."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "deep analysis", "model": "qwen3:14b"}
result = await router.complete(
messages=[{"role": "user", "content": "analyze and prioritize the backlog"}],
)
assert result["complexity"] == "complex"
assert result["model"] == "qwen3:14b"
@pytest.mark.asyncio
async def test_invalid_complexity_hint_falls_back_to_auto(self):
"""Invalid complexity_hint should log a warning and auto-classify."""
router = CascadeRouter(config_path=Path("/nonexistent"))
router.config.fallback_chains = {
"routine": ["qwen3:8b"],
"complex": ["qwen3:14b"],
}
router.providers = [self._make_dual_model_provider()]
with patch.object(router, "_call_ollama") as mock_call:
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
# Should not raise
result = await router.complete(
messages=[{"role": "user", "content": "status"}],
complexity_hint="INVALID_HINT",
)
assert result["complexity"] in ("simple", "complex") # auto-classified

View File

@@ -1,134 +0,0 @@
"""Tests for Qwen3 dual-model task complexity classifier."""
import pytest
from infrastructure.router.classifier import TaskComplexity, classify_task
class TestClassifyTask:
"""Tests for classify_task heuristics."""
# ── Simple / routine tasks ──────────────────────────────────────────────
def test_empty_messages_is_simple(self):
assert classify_task([]) == TaskComplexity.SIMPLE
def test_no_user_content_is_simple(self):
messages = [{"role": "system", "content": "You are Timmy."}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_status_query_is_simple(self):
messages = [{"role": "user", "content": "status"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_list_command_is_simple(self):
messages = [{"role": "user", "content": "list all tasks"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_get_command_is_simple(self):
messages = [{"role": "user", "content": "get the latest log entry"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_short_message_under_threshold_is_simple(self):
messages = [{"role": "user", "content": "run the build"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
def test_affirmation_is_simple(self):
messages = [{"role": "user", "content": "yes"}]
assert classify_task(messages) == TaskComplexity.SIMPLE
# ── Complex / quality-sensitive tasks ──────────────────────────────────
def test_plan_keyword_is_complex(self):
messages = [{"role": "user", "content": "plan the sprint"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_review_keyword_is_complex(self):
messages = [{"role": "user", "content": "review this code"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyze_keyword_is_complex(self):
messages = [{"role": "user", "content": "analyze performance"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_triage_keyword_is_complex(self):
messages = [{"role": "user", "content": "triage the open issues"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_refactor_keyword_is_complex(self):
messages = [{"role": "user", "content": "refactor the auth module"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_explain_keyword_is_complex(self):
messages = [{"role": "user", "content": "explain how the router works"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_prioritize_keyword_is_complex(self):
messages = [{"role": "user", "content": "prioritize the backlog"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_long_message_is_complex(self):
long_msg = "do something " * 50 # > 500 chars
messages = [{"role": "user", "content": long_msg}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_numbered_list_is_complex(self):
messages = [
{
"role": "user",
"content": "1. Read the file 2. Analyze it 3. Write a report",
}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_code_block_is_complex(self):
messages = [
{"role": "user", "content": "Here is the code:\n```python\nprint('hello')\n```"}
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_deep_conversation_is_complex(self):
messages = [
{"role": "user", "content": "hi"},
{"role": "assistant", "content": "hello"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "ok"},
{"role": "assistant", "content": "yes"},
{"role": "user", "content": "now do the thing"},
]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_analyse_british_spelling_is_complex(self):
messages = [{"role": "user", "content": "analyse this dataset"}]
assert classify_task(messages) == TaskComplexity.COMPLEX
def test_non_string_content_is_ignored(self):
"""Non-string content should not crash the classifier."""
messages = [{"role": "user", "content": ["part1", "part2"]}]
# Should not raise; result doesn't matter — just must not blow up
result = classify_task(messages)
assert isinstance(result, TaskComplexity)
def test_system_message_not_counted_as_user(self):
"""System message alone should not trigger complex keywords."""
messages = [
{"role": "system", "content": "analyze everything carefully"},
{"role": "user", "content": "yes"},
]
# "analyze" is in system message (not user) — user says "yes" → simple
assert classify_task(messages) == TaskComplexity.SIMPLE
class TestTaskComplexityEnum:
"""Tests for TaskComplexity enum values."""
def test_simple_value(self):
assert TaskComplexity.SIMPLE.value == "simple"
def test_complex_value(self):
assert TaskComplexity.COMPLEX.value == "complex"
def test_lookup_by_value(self):
assert TaskComplexity("simple") == TaskComplexity.SIMPLE
assert TaskComplexity("complex") == TaskComplexity.COMPLEX

View File

@@ -1,503 +0,0 @@
"""Tests for the agent dispatcher (timmy.dispatcher)."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.dispatcher import (
AGENT_REGISTRY,
AgentType,
DispatchResult,
DispatchStatus,
TaskType,
_dispatch_local,
_dispatch_via_api,
_dispatch_via_gitea,
dispatch_task,
infer_task_type,
select_agent,
wait_for_completion,
)
# ---------------------------------------------------------------------------
# Agent registry
# ---------------------------------------------------------------------------
class TestAgentRegistry:
def test_all_agents_present(self):
for member in AgentType:
assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry"
def test_agent_specs_have_display_names(self):
for agent, spec in AGENT_REGISTRY.items():
assert spec.display_name, f"{agent} has empty display_name"
def test_gitea_agents_have_labels(self):
for agent, spec in AGENT_REGISTRY.items():
if spec.interface == "gitea":
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
def test_non_gitea_agents_have_no_labels(self):
for agent, spec in AGENT_REGISTRY.items():
if spec.interface not in ("gitea",):
# api and local agents may have no label
assert spec.gitea_label is None or spec.interface == "gitea"
def test_max_concurrent_positive(self):
for agent, spec in AGENT_REGISTRY.items():
assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1"
# ---------------------------------------------------------------------------
# select_agent
# ---------------------------------------------------------------------------
class TestSelectAgent:
def test_architecture_routes_to_claude(self):
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
def test_refactoring_routes_to_claude(self):
assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE
def test_code_review_routes_to_claude(self):
assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE
def test_routine_coding_routes_to_kimi(self):
assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE
def test_fast_iteration_routes_to_kimi(self):
assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE
def test_research_routes_to_agent_api(self):
assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API
def test_triage_routes_to_timmy(self):
assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY
def test_planning_routes_to_timmy(self):
assert select_agent(TaskType.PLANNING) == AgentType.TIMMY
# ---------------------------------------------------------------------------
# infer_task_type
# ---------------------------------------------------------------------------
class TestInferTaskType:
def test_architecture_keyword(self):
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
def test_refactor_keyword(self):
assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING
def test_code_review_keyword(self):
assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW
def test_research_keyword(self):
assert infer_task_type("Research embedding models") == TaskType.RESEARCH
def test_triage_keyword(self):
assert infer_task_type("Triage open issues") == TaskType.TRIAGE
def test_planning_keyword(self):
assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING
def test_fallback_returns_routine_coding(self):
assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING
def test_description_contributes_to_inference(self):
result = infer_task_type("Implement feature", "We need to refactor the old code")
assert result == TaskType.REFACTORING
def test_case_insensitive(self):
assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE
# ---------------------------------------------------------------------------
# DispatchResult
# ---------------------------------------------------------------------------
class TestDispatchResult:
def test_success_when_assigned(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.ASSIGNED,
)
assert r.success is True
def test_success_when_completed(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.COMPLETED,
)
assert r.success is True
def test_not_success_when_failed(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.FAILED,
)
assert r.success is False
def test_not_success_when_escalated(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.ESCALATED,
)
assert r.success is False
# ---------------------------------------------------------------------------
# _dispatch_local
# ---------------------------------------------------------------------------
class TestDispatchLocal:
async def test_returns_assigned(self):
result = await _dispatch_local(
title="Plan the migration",
description="We need a plan.",
acceptance_criteria=["Plan is documented"],
issue_number=42,
)
assert result.status == DispatchStatus.ASSIGNED
assert result.agent == AgentType.TIMMY
assert result.issue_number == 42
async def test_infers_task_type(self):
result = await _dispatch_local(
title="Plan the sprint",
description="",
acceptance_criteria=[],
)
assert result.task_type == TaskType.PLANNING
async def test_no_issue_number(self):
result = await _dispatch_local(title="Do something", description="")
assert result.issue_number is None
# ---------------------------------------------------------------------------
# _dispatch_via_api
# ---------------------------------------------------------------------------
class TestDispatchViaApi:
async def test_no_endpoint_returns_failed(self):
result = await _dispatch_via_api(
agent=AgentType.AGENT_API,
title="Analyse logs",
description="",
acceptance_criteria=[],
)
assert result.status == DispatchStatus.FAILED
assert "No API endpoint" in (result.error or "")
async def test_successful_api_call(self):
mock_resp = MagicMock()
mock_resp.status_code = 202
mock_resp.content = b'{"ok": true}'
mock_resp.json.return_value = {"ok": True}
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await _dispatch_via_api(
agent=AgentType.AGENT_API,
title="Analyse logs",
description="Look at the logs",
acceptance_criteria=["Report produced"],
endpoint="http://fake-agent/dispatch",
)
assert result.status == DispatchStatus.ASSIGNED
assert result.agent == AgentType.AGENT_API
async def test_api_error_returns_failed(self):
mock_resp = MagicMock()
mock_resp.status_code = 500
mock_resp.text = "Internal Server Error"
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await _dispatch_via_api(
agent=AgentType.AGENT_API,
title="Analyse logs",
description="",
acceptance_criteria=[],
endpoint="http://fake-agent/dispatch",
)
assert result.status == DispatchStatus.FAILED
assert "500" in (result.error or "")
# ---------------------------------------------------------------------------
# _dispatch_via_gitea
# ---------------------------------------------------------------------------
_GITEA_SETTINGS = MagicMock(
gitea_enabled=True,
gitea_token="test-token",
gitea_url="http://gitea.test",
gitea_repo="owner/repo",
)
class TestDispatchViaGitea:
def _make_client(self, label_list=None, label_create_status=201, comment_status=201):
"""Build a mock httpx.AsyncClient for Gitea interactions."""
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = label_list or []
create_label_resp = MagicMock()
create_label_resp.status_code = label_create_status
create_label_resp.json.return_value = {"id": 99}
apply_label_resp = MagicMock()
apply_label_resp.status_code = 201
comment_resp = MagicMock()
comment_resp.status_code = comment_status
comment_resp.json.return_value = {"id": 7}
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.get = AsyncMock(return_value=label_resp)
client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp])
return client
async def test_successful_gitea_dispatch(self):
client = self._make_client()
with (
patch("httpx.AsyncClient", return_value=client),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=1072,
title="Design the router",
description="We need a cascade router.",
acceptance_criteria=["Failover works"],
)
assert result.success
assert result.agent == AgentType.CLAUDE_CODE
assert result.issue_number == 1072
assert result.status == DispatchStatus.ASSIGNED
async def test_no_gitea_token_returns_failed(self):
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
with patch("timmy.dispatcher.settings", bad_settings):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=1,
title="Some task",
description="",
acceptance_criteria=[],
)
assert result.status == DispatchStatus.FAILED
assert "not configured" in (result.error or "").lower()
async def test_gitea_disabled_returns_failed(self):
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
with patch("timmy.dispatcher.settings", bad_settings):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=1,
title="Some task",
description="",
acceptance_criteria=[],
)
assert result.status == DispatchStatus.FAILED
async def test_existing_label_reused(self):
"""When the label already exists, it should be reused (no creation call)."""
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "claude-ready", "id": 55}]
apply_resp = MagicMock()
apply_resp.status_code = 201
comment_resp = MagicMock()
comment_resp.status_code = 201
comment_resp.json.return_value = {"id": 8}
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.get = AsyncMock(return_value=label_resp)
client.post = AsyncMock(side_effect=[apply_resp, comment_resp])
with (
patch("httpx.AsyncClient", return_value=client),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=10,
title="Architecture task",
description="",
acceptance_criteria=[],
)
assert result.success
# Should only have 2 POST calls: apply label + comment (no label creation)
assert client.post.call_count == 2
# ---------------------------------------------------------------------------
# dispatch_task (integration-style)
# ---------------------------------------------------------------------------
class TestDispatchTask:
async def test_empty_title_returns_failed(self):
result = await dispatch_task(title=" ")
assert result.status == DispatchStatus.FAILED
assert "`title` is required" in (result.error or "")
async def test_local_dispatch_for_timmy_task(self):
result = await dispatch_task(
title="Triage the open issues",
description="We have 40 open issues.",
acceptance_criteria=["Issues are labelled"],
task_type=TaskType.TRIAGE,
)
assert result.agent == AgentType.TIMMY
assert result.success
async def test_explicit_agent_override(self):
"""Caller can force a specific agent regardless of task type."""
result = await dispatch_task(
title="Triage the open issues",
agent=AgentType.TIMMY,
)
assert result.agent == AgentType.TIMMY
async def test_gitea_dispatch_when_issue_provided(self):
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
apply_resp = MagicMock(status_code=201)
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp])
with (
patch("httpx.AsyncClient", return_value=client_mock),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
result = await dispatch_task(
title="Design the cascade router",
description="Architecture task.",
task_type=TaskType.ARCHITECTURE,
issue_number=1072,
)
assert result.agent == AgentType.CLAUDE_CODE
assert result.success
async def test_escalation_after_max_retries(self):
"""If all attempts fail, the result is ESCALATED."""
with (
patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch,
patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock),
):
mock_dispatch.return_value = DispatchResult(
task_type=TaskType.ARCHITECTURE,
agent=AgentType.CLAUDE_CODE,
issue_number=1,
status=DispatchStatus.FAILED,
error="Gitea offline",
)
result = await dispatch_task(
title="Design router",
task_type=TaskType.ARCHITECTURE,
issue_number=1,
max_retries=1,
)
assert result.status == DispatchStatus.ESCALATED
assert mock_dispatch.call_count == 2 # initial + 1 retry
async def test_no_retry_on_success(self):
with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch:
mock_dispatch.return_value = DispatchResult(
task_type=TaskType.ARCHITECTURE,
agent=AgentType.CLAUDE_CODE,
issue_number=1,
status=DispatchStatus.ASSIGNED,
comment_id=42,
label_applied="claude-ready",
)
result = await dispatch_task(
title="Design router",
task_type=TaskType.ARCHITECTURE,
issue_number=1,
max_retries=2,
)
assert result.success
assert mock_dispatch.call_count == 1 # no retries needed
# ---------------------------------------------------------------------------
# wait_for_completion
# ---------------------------------------------------------------------------
class TestWaitForCompletion:
async def test_returns_completed_when_issue_closed(self):
closed_resp = MagicMock(
status_code=200,
json=MagicMock(return_value={"state": "closed"}),
)
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(return_value=closed_resp)
with (
patch("httpx.AsyncClient", return_value=client_mock),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5)
assert status == DispatchStatus.COMPLETED
async def test_returns_timed_out_when_still_open(self):
open_resp = MagicMock(
status_code=200,
json=MagicMock(return_value={"state": "open"}),
)
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(return_value=open_resp)
with (
patch("httpx.AsyncClient", return_value=client_mock),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
patch("asyncio.sleep", new_callable=AsyncMock),
):
status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2)
assert status == DispatchStatus.TIMED_OUT

21
tox.ini
View File

@@ -47,10 +47,12 @@ commands =
# ── Test Environments ────────────────────────────────────────────────────────
[testenv:unit]
description = Fast unit tests — only tests marked @pytest.mark.unit
description = Fast tests — excludes e2e, functional, and external services
commands =
pytest tests/ -q --tb=short \
-m "unit and not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
--ignore=tests/e2e \
--ignore=tests/functional \
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
-n auto --dist worksteal
[testenv:integration]
@@ -102,7 +104,7 @@ commands =
--cov-report=xml:reports/coverage.xml \
--cov-fail-under=73 \
--junitxml=reports/junit.xml \
-p no:xdist \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow"
[testenv:coverage]
@@ -113,7 +115,7 @@ commands =
--cov-report=term-missing \
--cov-report=xml \
--cov-fail-under=73 \
-p no:xdist \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not slow"
[testenv:coverage-html]
@@ -124,7 +126,16 @@ commands =
--cov-report=term-missing \
--cov-report=html \
--cov-fail-under=73 \
-p no:xdist \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not slow"
[testenv:coverage-parallel]
description = Parallel coverage report
commands =
pytest tests/ -q --tb=short \
--cov=src \
--cov-report=term-missing \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not slow"
# ── Pre-push (mirrors CI exactly) ────────────────────────────────────────────