forked from Rockachopa/Timmy-time-dashboard
Compare commits
4 Commits
gemini/iss
...
claude/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
0b284972cb | ||
|
|
6c5f55230b | ||
| cf82bb0be4 | |||
| e492a51510 |
@@ -25,6 +25,19 @@ providers:
|
||||
tier: local
|
||||
url: "http://localhost:11434"
|
||||
models:
|
||||
# ── Dual-model routing: Qwen3-8B (fast) + Qwen3-14B (quality) ──────────
|
||||
# Both models fit simultaneously: ~6.6 GB + ~10.5 GB = ~17 GB combined.
|
||||
# Requires OLLAMA_MAX_LOADED_MODELS=2 (set in .env) to stay hot.
|
||||
# Ref: issue #1065 — Qwen3-8B/14B dual-model routing strategy
|
||||
- name: qwen3:8b
|
||||
context_window: 32768
|
||||
capabilities: [text, tools, json, streaming, routine]
|
||||
description: "Qwen3-8B Q6_K — fast router for routine tasks (~6.6 GB, 45-55 tok/s)"
|
||||
- name: qwen3:14b
|
||||
context_window: 40960
|
||||
capabilities: [text, tools, json, streaming, complex, reasoning]
|
||||
description: "Qwen3-14B Q5_K_M — complex reasoning and planning (~10.5 GB, 20-28 tok/s)"
|
||||
|
||||
# Text + Tools models
|
||||
- name: qwen3:30b
|
||||
default: true
|
||||
@@ -187,6 +200,20 @@ fallback_chains:
|
||||
- dolphin3 # base Dolphin 3.0 8B (uncensored, no custom system prompt)
|
||||
- qwen3:30b # primary fallback — usually sufficient with a good system prompt
|
||||
|
||||
# ── Complexity-based routing chains (issue #1065) ───────────────────────
|
||||
# Routine tasks: prefer Qwen3-8B for low latency (~45-55 tok/s)
|
||||
routine:
|
||||
- qwen3:8b # Primary fast model
|
||||
- llama3.1:8b-instruct # Fallback fast model
|
||||
- llama3.2:3b # Smallest available
|
||||
|
||||
# Complex tasks: prefer Qwen3-14B for quality (~20-28 tok/s)
|
||||
complex:
|
||||
- qwen3:14b # Primary quality model
|
||||
- hermes4-14b # Native tool calling, hybrid reasoning
|
||||
- qwen3:30b # Highest local quality
|
||||
- qwen2.5:14b # Additional fallback
|
||||
|
||||
# ── Custom Models ───────────────────────────────────────────────────────────
|
||||
# Register custom model weights for per-agent assignment.
|
||||
# Supports GGUF (Ollama), safetensors, and HuggingFace checkpoint dirs.
|
||||
|
||||
@@ -41,6 +41,13 @@ class Settings(BaseSettings):
|
||||
# 4096 keeps memory at ~19GB. Set to 0 to use model defaults.
|
||||
ollama_num_ctx: int = 4096
|
||||
|
||||
# Maximum models loaded simultaneously in Ollama — override with OLLAMA_MAX_LOADED_MODELS
|
||||
# Set to 2 so Qwen3-8B and Qwen3-14B can stay hot concurrently (~17 GB combined).
|
||||
# Requires Ollama ≥ 0.1.33. Export this to the Ollama process environment:
|
||||
# OLLAMA_MAX_LOADED_MODELS=2 ollama serve
|
||||
# or add it to your systemd/launchd unit before starting the harness.
|
||||
ollama_max_loaded_models: int = 2
|
||||
|
||||
# Fallback model chains — override with FALLBACK_MODELS / VISION_FALLBACK_MODELS
|
||||
# as comma-separated strings, e.g. FALLBACK_MODELS="qwen3:30b,llama3.1"
|
||||
# Or edit config/providers.yaml → fallback_chains for the canonical source.
|
||||
|
||||
@@ -2,6 +2,7 @@
|
||||
|
||||
from .api import router
|
||||
from .cascade import CascadeRouter, Provider, ProviderStatus, get_router
|
||||
from .classifier import TaskComplexity, classify_task
|
||||
from .history import HealthHistoryStore, get_history_store
|
||||
|
||||
__all__ = [
|
||||
@@ -12,4 +13,6 @@ __all__ = [
|
||||
"router",
|
||||
"HealthHistoryStore",
|
||||
"get_history_store",
|
||||
"TaskComplexity",
|
||||
"classify_task",
|
||||
]
|
||||
|
||||
@@ -528,6 +528,34 @@ class CascadeRouter:
|
||||
|
||||
return True
|
||||
|
||||
def _get_model_for_complexity(
|
||||
self, provider: Provider, complexity: "TaskComplexity"
|
||||
) -> str | None:
|
||||
"""Return the best model on *provider* for the given complexity tier.
|
||||
|
||||
Checks fallback chains first (routine / complex), then falls back to
|
||||
any model with the matching capability tag, then the provider default.
|
||||
"""
|
||||
from infrastructure.router.classifier import TaskComplexity
|
||||
|
||||
chain_key = "routine" if complexity == TaskComplexity.SIMPLE else "complex"
|
||||
|
||||
# Walk the capability fallback chain — first model present on this provider wins
|
||||
for model_name in self.config.fallback_chains.get(chain_key, []):
|
||||
if any(m["name"] == model_name for m in provider.models):
|
||||
return model_name
|
||||
|
||||
# Direct capability lookup — only return if a model explicitly has the tag
|
||||
# (do not use get_model_with_capability here as it falls back to the default)
|
||||
cap_model = next(
|
||||
(m["name"] for m in provider.models if chain_key in m.get("capabilities", [])),
|
||||
None,
|
||||
)
|
||||
if cap_model:
|
||||
return cap_model
|
||||
|
||||
return None # Caller will use provider default
|
||||
|
||||
async def complete(
|
||||
self,
|
||||
messages: list[dict],
|
||||
@@ -535,6 +563,7 @@ class CascadeRouter:
|
||||
temperature: float = 0.7,
|
||||
max_tokens: int | None = None,
|
||||
cascade_tier: str | None = None,
|
||||
complexity_hint: str | None = None,
|
||||
) -> dict:
|
||||
"""Complete a chat conversation with automatic failover.
|
||||
|
||||
@@ -543,24 +572,48 @@ class CascadeRouter:
|
||||
- Falls back to vision-capable models when needed
|
||||
- Supports image URLs, paths, and base64 encoding
|
||||
|
||||
Complexity-based routing (issue #1065):
|
||||
- ``complexity_hint="simple"`` → routes to Qwen3-8B (low-latency)
|
||||
- ``complexity_hint="complex"`` → routes to Qwen3-14B (quality)
|
||||
- ``complexity_hint=None`` (default) → auto-classifies from messages
|
||||
|
||||
Args:
|
||||
messages: List of message dicts with role and content
|
||||
model: Preferred model (tries this first, then provider defaults)
|
||||
model: Preferred model (tries this first; complexity routing is
|
||||
skipped when an explicit model is given)
|
||||
temperature: Sampling temperature
|
||||
max_tokens: Maximum tokens to generate
|
||||
cascade_tier: If specified, filters providers by this tier.
|
||||
- "frontier_required": Uses only Anthropic provider for top-tier models.
|
||||
complexity_hint: "simple", "complex", or None (auto-detect).
|
||||
|
||||
Returns:
|
||||
Dict with content, provider_used, and metrics
|
||||
Dict with content, provider_used, model, latency_ms,
|
||||
is_fallback_model, and complexity fields.
|
||||
|
||||
Raises:
|
||||
RuntimeError: If all providers fail
|
||||
"""
|
||||
from infrastructure.router.classifier import TaskComplexity, classify_task
|
||||
|
||||
content_type = self._detect_content_type(messages)
|
||||
if content_type != ContentType.TEXT:
|
||||
logger.debug("Detected %s content, selecting appropriate model", content_type.value)
|
||||
|
||||
# Resolve task complexity ─────────────────────────────────────────────
|
||||
# Skip complexity routing when caller explicitly specifies a model.
|
||||
complexity: TaskComplexity | None = None
|
||||
if model is None:
|
||||
if complexity_hint is not None:
|
||||
try:
|
||||
complexity = TaskComplexity(complexity_hint.lower())
|
||||
except ValueError:
|
||||
logger.warning("Unknown complexity_hint %r, auto-classifying", complexity_hint)
|
||||
complexity = classify_task(messages)
|
||||
else:
|
||||
complexity = classify_task(messages)
|
||||
logger.debug("Task complexity: %s", complexity.value)
|
||||
|
||||
errors = []
|
||||
|
||||
providers = self.providers
|
||||
@@ -573,7 +626,6 @@ class CascadeRouter:
|
||||
if not providers:
|
||||
raise RuntimeError(f"No providers found for tier: {cascade_tier}")
|
||||
|
||||
|
||||
for provider in providers:
|
||||
if not self._is_provider_available(provider):
|
||||
continue
|
||||
@@ -587,7 +639,21 @@ class CascadeRouter:
|
||||
)
|
||||
continue
|
||||
|
||||
selected_model, is_fallback_model = self._select_model(provider, model, content_type)
|
||||
# Complexity-based model selection (only when no explicit model) ──
|
||||
effective_model = model
|
||||
if effective_model is None and complexity is not None:
|
||||
effective_model = self._get_model_for_complexity(provider, complexity)
|
||||
if effective_model:
|
||||
logger.debug(
|
||||
"Complexity routing [%s]: %s → %s",
|
||||
complexity.value,
|
||||
provider.name,
|
||||
effective_model,
|
||||
)
|
||||
|
||||
selected_model, is_fallback_model = self._select_model(
|
||||
provider, effective_model, content_type
|
||||
)
|
||||
|
||||
try:
|
||||
result = await self._attempt_with_retry(
|
||||
@@ -610,6 +676,7 @@ class CascadeRouter:
|
||||
"model": result.get("model", selected_model or provider.get_default_model()),
|
||||
"latency_ms": result.get("latency_ms", 0),
|
||||
"is_fallback_model": is_fallback_model,
|
||||
"complexity": complexity.value if complexity is not None else None,
|
||||
}
|
||||
|
||||
raise RuntimeError(f"All providers failed: {'; '.join(errors)}")
|
||||
|
||||
166
src/infrastructure/router/classifier.py
Normal file
166
src/infrastructure/router/classifier.py
Normal file
@@ -0,0 +1,166 @@
|
||||
"""Task complexity classifier for Qwen3 dual-model routing.
|
||||
|
||||
Classifies incoming tasks as SIMPLE (route to Qwen3-8B for low-latency)
|
||||
or COMPLEX (route to Qwen3-14B for quality-sensitive work).
|
||||
|
||||
Classification is fully heuristic — no LLM inference required.
|
||||
"""
|
||||
|
||||
import re
|
||||
from enum import Enum
|
||||
|
||||
|
||||
class TaskComplexity(Enum):
|
||||
"""Task complexity tier for model routing."""
|
||||
|
||||
SIMPLE = "simple" # Qwen3-8B Q6_K: routine, latency-sensitive
|
||||
COMPLEX = "complex" # Qwen3-14B Q5_K_M: quality-sensitive, multi-step
|
||||
|
||||
|
||||
# Keywords strongly associated with complex tasks
|
||||
_COMPLEX_KEYWORDS: frozenset[str] = frozenset(
|
||||
[
|
||||
"plan",
|
||||
"review",
|
||||
"analyze",
|
||||
"analyse",
|
||||
"triage",
|
||||
"refactor",
|
||||
"design",
|
||||
"architecture",
|
||||
"implement",
|
||||
"compare",
|
||||
"debug",
|
||||
"explain",
|
||||
"prioritize",
|
||||
"prioritise",
|
||||
"strategy",
|
||||
"optimize",
|
||||
"optimise",
|
||||
"evaluate",
|
||||
"assess",
|
||||
"brainstorm",
|
||||
"outline",
|
||||
"summarize",
|
||||
"summarise",
|
||||
"generate code",
|
||||
"write a",
|
||||
"write the",
|
||||
"code review",
|
||||
"pull request",
|
||||
"multi-step",
|
||||
"multi step",
|
||||
"step by step",
|
||||
"backlog prioriti",
|
||||
"issue triage",
|
||||
"root cause",
|
||||
"how does",
|
||||
"why does",
|
||||
"what are the",
|
||||
]
|
||||
)
|
||||
|
||||
# Keywords strongly associated with simple/routine tasks
|
||||
_SIMPLE_KEYWORDS: frozenset[str] = frozenset(
|
||||
[
|
||||
"status",
|
||||
"list ",
|
||||
"show ",
|
||||
"what is",
|
||||
"how many",
|
||||
"ping",
|
||||
"run ",
|
||||
"execute ",
|
||||
"ls ",
|
||||
"cat ",
|
||||
"ps ",
|
||||
"fetch ",
|
||||
"count ",
|
||||
"tail ",
|
||||
"head ",
|
||||
"grep ",
|
||||
"find file",
|
||||
"read file",
|
||||
"get ",
|
||||
"query ",
|
||||
"check ",
|
||||
"yes",
|
||||
"no",
|
||||
"ok",
|
||||
"done",
|
||||
"thanks",
|
||||
]
|
||||
)
|
||||
|
||||
# Content longer than this is treated as complex regardless of keywords
|
||||
_COMPLEX_CHAR_THRESHOLD = 500
|
||||
|
||||
# Short content defaults to simple
|
||||
_SIMPLE_CHAR_THRESHOLD = 150
|
||||
|
||||
# More than this many messages suggests an ongoing complex conversation
|
||||
_COMPLEX_CONVERSATION_DEPTH = 6
|
||||
|
||||
|
||||
def classify_task(messages: list[dict]) -> TaskComplexity:
|
||||
"""Classify task complexity from a list of messages.
|
||||
|
||||
Uses heuristic rules — no LLM call required. Errs toward COMPLEX
|
||||
when uncertain so that quality is preserved.
|
||||
|
||||
Args:
|
||||
messages: List of message dicts with ``role`` and ``content`` keys.
|
||||
|
||||
Returns:
|
||||
TaskComplexity.SIMPLE or TaskComplexity.COMPLEX
|
||||
"""
|
||||
if not messages:
|
||||
return TaskComplexity.SIMPLE
|
||||
|
||||
# Concatenate all user-turn content for analysis
|
||||
user_content = " ".join(
|
||||
msg.get("content", "")
|
||||
for msg in messages
|
||||
if msg.get("role") in ("user", "human")
|
||||
and isinstance(msg.get("content"), str)
|
||||
).lower().strip()
|
||||
|
||||
if not user_content:
|
||||
return TaskComplexity.SIMPLE
|
||||
|
||||
# Complexity signals override everything -----------------------------------
|
||||
|
||||
# Explicit complex keywords
|
||||
for kw in _COMPLEX_KEYWORDS:
|
||||
if kw in user_content:
|
||||
return TaskComplexity.COMPLEX
|
||||
|
||||
# Numbered / multi-step instruction list: "1. do this 2. do that"
|
||||
if re.search(r"\b\d+\.\s+\w", user_content):
|
||||
return TaskComplexity.COMPLEX
|
||||
|
||||
# Code blocks embedded in messages
|
||||
if "```" in user_content:
|
||||
return TaskComplexity.COMPLEX
|
||||
|
||||
# Long content → complex reasoning likely required
|
||||
if len(user_content) > _COMPLEX_CHAR_THRESHOLD:
|
||||
return TaskComplexity.COMPLEX
|
||||
|
||||
# Deep conversation → complex ongoing task
|
||||
if len(messages) > _COMPLEX_CONVERSATION_DEPTH:
|
||||
return TaskComplexity.COMPLEX
|
||||
|
||||
# Simplicity signals -------------------------------------------------------
|
||||
|
||||
# Explicit simple keywords
|
||||
for kw in _SIMPLE_KEYWORDS:
|
||||
if kw in user_content:
|
||||
return TaskComplexity.SIMPLE
|
||||
|
||||
# Short single-sentence messages default to simple
|
||||
if len(user_content) <= _SIMPLE_CHAR_THRESHOLD:
|
||||
return TaskComplexity.SIMPLE
|
||||
|
||||
# When uncertain, prefer quality (complex model)
|
||||
return TaskComplexity.COMPLEX
|
||||
801
src/timmy/dispatcher.py
Normal file
801
src/timmy/dispatcher.py
Normal file
@@ -0,0 +1,801 @@
|
||||
"""Agent dispatcher — route tasks to Claude Code, Kimi, APIs, or Timmy itself.
|
||||
|
||||
Timmy's dispatch system: knows what agents are available, what they're good
|
||||
at, and how to send them work. Uses Gitea labels and issue comments to assign
|
||||
tasks and track completion.
|
||||
|
||||
Dispatch flow:
|
||||
1. Match task type to agent strengths
|
||||
2. Check agent availability (idle or working?)
|
||||
3. Dispatch task with full context (issue link, requirements, criteria)
|
||||
4. Log assignment as a Gitea comment
|
||||
5. Monitor for completion or timeout
|
||||
6. Review output quality
|
||||
7. If output fails QA → reassign or escalate
|
||||
|
||||
Agent interfaces:
|
||||
- Claude Code → ``claude-ready`` Gitea label + issue comment
|
||||
- Kimi Code → ``kimi-ready`` Gitea label + issue comment
|
||||
- Agent APIs → HTTP POST to external endpoint
|
||||
- Timmy (self) → direct local invocation
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.dispatcher import dispatch_task, TaskType, AgentType
|
||||
|
||||
result = await dispatch_task(
|
||||
issue_number=1072,
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
title="Design the LLM router",
|
||||
description="We need a cascade router...",
|
||||
acceptance_criteria=["Failover works", "Metrics exposed"],
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Enumerations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class AgentType(str, Enum):
|
||||
"""Known agents in the swarm."""
|
||||
|
||||
CLAUDE_CODE = "claude_code"
|
||||
KIMI_CODE = "kimi_code"
|
||||
AGENT_API = "agent_api"
|
||||
TIMMY = "timmy"
|
||||
|
||||
|
||||
class TaskType(str, Enum):
|
||||
"""Categories of engineering work."""
|
||||
|
||||
# Claude Code strengths
|
||||
ARCHITECTURE = "architecture"
|
||||
REFACTORING = "refactoring"
|
||||
COMPLEX_REASONING = "complex_reasoning"
|
||||
CODE_REVIEW = "code_review"
|
||||
|
||||
# Kimi Code strengths
|
||||
PARALLEL_IMPLEMENTATION = "parallel_implementation"
|
||||
ROUTINE_CODING = "routine_coding"
|
||||
FAST_ITERATION = "fast_iteration"
|
||||
|
||||
# Agent API strengths
|
||||
RESEARCH = "research"
|
||||
ANALYSIS = "analysis"
|
||||
SPECIALIZED = "specialized"
|
||||
|
||||
# Timmy strengths
|
||||
TRIAGE = "triage"
|
||||
PLANNING = "planning"
|
||||
CREATIVE = "creative"
|
||||
ORCHESTRATION = "orchestration"
|
||||
|
||||
|
||||
class DispatchStatus(str, Enum):
|
||||
"""Lifecycle state of a dispatched task."""
|
||||
|
||||
PENDING = "pending"
|
||||
ASSIGNED = "assigned"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
ESCALATED = "escalated"
|
||||
TIMED_OUT = "timed_out"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class AgentSpec:
|
||||
"""Capabilities and limits for a single agent."""
|
||||
|
||||
name: AgentType
|
||||
display_name: str
|
||||
strengths: frozenset[TaskType]
|
||||
gitea_label: str | None # label to apply when dispatching
|
||||
max_concurrent: int = 1
|
||||
interface: str = "gitea" # "gitea" | "api" | "local"
|
||||
api_endpoint: str | None = None # for interface="api"
|
||||
|
||||
|
||||
#: Authoritative agent registry — all known agents and their capabilities.
|
||||
AGENT_REGISTRY: dict[AgentType, AgentSpec] = {
|
||||
AgentType.CLAUDE_CODE: AgentSpec(
|
||||
name=AgentType.CLAUDE_CODE,
|
||||
display_name="Claude Code",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.ARCHITECTURE,
|
||||
TaskType.REFACTORING,
|
||||
TaskType.COMPLEX_REASONING,
|
||||
TaskType.CODE_REVIEW,
|
||||
}
|
||||
),
|
||||
gitea_label="claude-ready",
|
||||
max_concurrent=1,
|
||||
interface="gitea",
|
||||
),
|
||||
AgentType.KIMI_CODE: AgentSpec(
|
||||
name=AgentType.KIMI_CODE,
|
||||
display_name="Kimi Code",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.PARALLEL_IMPLEMENTATION,
|
||||
TaskType.ROUTINE_CODING,
|
||||
TaskType.FAST_ITERATION,
|
||||
}
|
||||
),
|
||||
gitea_label="kimi-ready",
|
||||
max_concurrent=1,
|
||||
interface="gitea",
|
||||
),
|
||||
AgentType.AGENT_API: AgentSpec(
|
||||
name=AgentType.AGENT_API,
|
||||
display_name="Agent API",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.RESEARCH,
|
||||
TaskType.ANALYSIS,
|
||||
TaskType.SPECIALIZED,
|
||||
}
|
||||
),
|
||||
gitea_label=None,
|
||||
max_concurrent=5,
|
||||
interface="api",
|
||||
),
|
||||
AgentType.TIMMY: AgentSpec(
|
||||
name=AgentType.TIMMY,
|
||||
display_name="Timmy",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.TRIAGE,
|
||||
TaskType.PLANNING,
|
||||
TaskType.CREATIVE,
|
||||
TaskType.ORCHESTRATION,
|
||||
}
|
||||
),
|
||||
gitea_label=None,
|
||||
max_concurrent=1,
|
||||
interface="local",
|
||||
),
|
||||
}
|
||||
|
||||
#: Map from task type to preferred agent (primary routing table).
|
||||
_TASK_ROUTING: dict[TaskType, AgentType] = {
|
||||
TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE,
|
||||
TaskType.REFACTORING: AgentType.CLAUDE_CODE,
|
||||
TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE,
|
||||
TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE,
|
||||
TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE,
|
||||
TaskType.ROUTINE_CODING: AgentType.KIMI_CODE,
|
||||
TaskType.FAST_ITERATION: AgentType.KIMI_CODE,
|
||||
TaskType.RESEARCH: AgentType.AGENT_API,
|
||||
TaskType.ANALYSIS: AgentType.AGENT_API,
|
||||
TaskType.SPECIALIZED: AgentType.AGENT_API,
|
||||
TaskType.TRIAGE: AgentType.TIMMY,
|
||||
TaskType.PLANNING: AgentType.TIMMY,
|
||||
TaskType.CREATIVE: AgentType.TIMMY,
|
||||
TaskType.ORCHESTRATION: AgentType.TIMMY,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch result
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class DispatchResult:
|
||||
"""Outcome of a dispatch call."""
|
||||
|
||||
task_type: TaskType
|
||||
agent: AgentType
|
||||
issue_number: int | None
|
||||
status: DispatchStatus
|
||||
comment_id: int | None = None
|
||||
label_applied: str | None = None
|
||||
error: str | None = None
|
||||
retry_count: int = 0
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def success(self) -> bool: # noqa: D401
|
||||
return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routing logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def select_agent(task_type: TaskType) -> AgentType:
|
||||
"""Return the best agent for *task_type* based on the routing table.
|
||||
|
||||
Args:
|
||||
task_type: The category of engineering work to be done.
|
||||
|
||||
Returns:
|
||||
The :class:`AgentType` best suited to handle this task.
|
||||
"""
|
||||
return _TASK_ROUTING.get(task_type, AgentType.TIMMY)
|
||||
|
||||
|
||||
def infer_task_type(title: str, description: str = "") -> TaskType:
|
||||
"""Heuristic: guess the most appropriate :class:`TaskType` from text.
|
||||
|
||||
Scans *title* and *description* for keyword signals and returns the
|
||||
strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`.
|
||||
|
||||
Args:
|
||||
title: Short task title.
|
||||
description: Longer task description (optional).
|
||||
|
||||
Returns:
|
||||
The inferred :class:`TaskType`.
|
||||
"""
|
||||
text = (title + " " + description).lower()
|
||||
|
||||
_SIGNALS: list[tuple[TaskType, frozenset[str]]] = [
|
||||
(TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})),
|
||||
(TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})),
|
||||
(TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})),
|
||||
(TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})),
|
||||
(TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})),
|
||||
(TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})),
|
||||
(TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})),
|
||||
(TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})),
|
||||
(TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})),
|
||||
(TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})),
|
||||
(TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})),
|
||||
(TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})),
|
||||
]
|
||||
|
||||
for task_type, keywords in _SIGNALS:
|
||||
if any(kw in text for kw in keywords):
|
||||
return task_type
|
||||
|
||||
return TaskType.ROUTINE_CODING
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _post_gitea_comment(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
repo: str,
|
||||
headers: dict[str, str],
|
||||
issue_number: int,
|
||||
body: str,
|
||||
) -> int | None:
|
||||
"""Post a comment on a Gitea issue and return the comment ID."""
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
|
||||
headers=headers,
|
||||
json={"body": body},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
return resp.json().get("id")
|
||||
logger.warning(
|
||||
"Comment on #%s returned %s: %s",
|
||||
issue_number,
|
||||
resp.status_code,
|
||||
resp.text[:200],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post comment on #%s: %s", issue_number, exc)
|
||||
return None
|
||||
|
||||
|
||||
async def _apply_gitea_label(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
repo: str,
|
||||
headers: dict[str, str],
|
||||
issue_number: int,
|
||||
label_name: str,
|
||||
label_color: str = "#0075ca",
|
||||
) -> bool:
|
||||
"""Ensure *label_name* exists and apply it to an issue.
|
||||
|
||||
Returns True if the label was successfully applied.
|
||||
"""
|
||||
# Resolve or create the label
|
||||
label_id: int | None = None
|
||||
try:
|
||||
resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers)
|
||||
if resp.status_code == 200:
|
||||
for lbl in resp.json():
|
||||
if lbl.get("name") == label_name:
|
||||
label_id = lbl["id"]
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to list labels: %s", exc)
|
||||
return False
|
||||
|
||||
if label_id is None:
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/labels",
|
||||
headers=headers,
|
||||
json={"name": label_name, "color": label_color},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
label_id = resp.json().get("id")
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create label %r: %s", label_name, exc)
|
||||
return False
|
||||
|
||||
if label_id is None:
|
||||
return False
|
||||
|
||||
# Apply label to the issue
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
|
||||
headers=headers,
|
||||
json={"labels": [label_id]},
|
||||
)
|
||||
return resp.status_code in (200, 201)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc)
|
||||
return False
|
||||
|
||||
|
||||
async def _poll_issue_completion(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 7200,
|
||||
) -> DispatchStatus:
|
||||
"""Poll a Gitea issue until closed (completed) or timeout.
|
||||
|
||||
Args:
|
||||
issue_number: Gitea issue to watch.
|
||||
poll_interval: Seconds between polls.
|
||||
max_wait: Maximum total seconds to wait.
|
||||
|
||||
Returns:
|
||||
:attr:`DispatchStatus.COMPLETED` if the issue was closed,
|
||||
:attr:`DispatchStatus.TIMED_OUT` otherwise.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc:
|
||||
logger.warning("poll_issue_completion: missing dependency: %s", exc)
|
||||
return DispatchStatus.FAILED
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
|
||||
|
||||
elapsed = 0
|
||||
while elapsed < max_wait:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(issue_url, headers=headers)
|
||||
if resp.status_code == 200 and resp.json().get("state") == "closed":
|
||||
logger.info("Issue #%s closed — task completed", issue_number)
|
||||
return DispatchStatus.COMPLETED
|
||||
except Exception as exc:
|
||||
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
|
||||
|
||||
await asyncio.sleep(poll_interval)
|
||||
elapsed += poll_interval
|
||||
|
||||
logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait)
|
||||
return DispatchStatus.TIMED_OUT
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core dispatch functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _dispatch_via_gitea(
|
||||
agent: AgentType,
|
||||
issue_number: int,
|
||||
title: str,
|
||||
description: str,
|
||||
acceptance_criteria: list[str],
|
||||
) -> DispatchResult:
|
||||
"""Assign a task by applying a Gitea label and posting an assignment comment.
|
||||
|
||||
Args:
|
||||
agent: Target agent.
|
||||
issue_number: Gitea issue to assign.
|
||||
title: Short task title.
|
||||
description: Full task description.
|
||||
acceptance_criteria: List of acceptance criteria strings.
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the outcome.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc:
|
||||
return DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"Missing dependency: {exc}",
|
||||
)
|
||||
|
||||
spec = AGENT_REGISTRY[agent]
|
||||
task_type = infer_task_type(title, description)
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Gitea integration not configured (no token or disabled).",
|
||||
)
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
comment_id: int | None = None
|
||||
label_applied: str | None = None
|
||||
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
# 1. Apply agent label (if applicable)
|
||||
if spec.gitea_label:
|
||||
ok = await _apply_gitea_label(
|
||||
client, base_url, repo, headers, issue_number, spec.gitea_label
|
||||
)
|
||||
if ok:
|
||||
label_applied = spec.gitea_label
|
||||
logger.info(
|
||||
"Applied label %r to issue #%s for %s",
|
||||
spec.gitea_label,
|
||||
issue_number,
|
||||
spec.display_name,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Could not apply label %r to issue #%s",
|
||||
spec.gitea_label,
|
||||
issue_number,
|
||||
)
|
||||
|
||||
# 2. Post assignment comment
|
||||
criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_"
|
||||
comment_body = (
|
||||
f"## Assigned to {spec.display_name}\n\n"
|
||||
f"**Task type:** `{task_type.value}`\n\n"
|
||||
f"**Description:**\n{description}\n\n"
|
||||
f"**Acceptance criteria:**\n{criteria_md}\n\n"
|
||||
f"---\n*Dispatched by Timmy agent dispatcher.*"
|
||||
)
|
||||
comment_id = await _post_gitea_comment(
|
||||
client, base_url, repo, headers, issue_number, comment_body
|
||||
)
|
||||
|
||||
if comment_id is not None or label_applied is not None:
|
||||
logger.info(
|
||||
"Dispatched issue #%s to %s (label=%r, comment=%s)",
|
||||
issue_number,
|
||||
spec.display_name,
|
||||
label_applied,
|
||||
comment_id,
|
||||
)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
comment_id=comment_id,
|
||||
label_applied=label_applied,
|
||||
)
|
||||
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Failed to apply label and post comment — check Gitea connectivity.",
|
||||
)
|
||||
|
||||
|
||||
async def _dispatch_via_api(
|
||||
agent: AgentType,
|
||||
title: str,
|
||||
description: str,
|
||||
acceptance_criteria: list[str],
|
||||
issue_number: int | None = None,
|
||||
endpoint: str | None = None,
|
||||
) -> DispatchResult:
|
||||
"""Dispatch a task to an external HTTP API agent.
|
||||
|
||||
Args:
|
||||
agent: Target agent.
|
||||
title: Short task title.
|
||||
description: Task description.
|
||||
acceptance_criteria: List of acceptance criteria.
|
||||
issue_number: Optional Gitea issue for cross-referencing.
|
||||
endpoint: Override API endpoint URL (uses spec default if omitted).
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the outcome.
|
||||
"""
|
||||
spec = AGENT_REGISTRY[agent]
|
||||
task_type = infer_task_type(title, description)
|
||||
url = endpoint or spec.api_endpoint
|
||||
|
||||
if not url:
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"No API endpoint configured for agent {agent.value}.",
|
||||
)
|
||||
|
||||
payload = {
|
||||
"title": title,
|
||||
"description": description,
|
||||
"acceptance_criteria": acceptance_criteria,
|
||||
"issue_number": issue_number,
|
||||
"agent": agent.value,
|
||||
"task_type": task_type.value,
|
||||
}
|
||||
|
||||
try:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(url, json=payload)
|
||||
|
||||
if resp.status_code in (200, 201, 202):
|
||||
logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
metadata={"response": resp.json() if resp.content else {}},
|
||||
)
|
||||
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"API agent returned {resp.status_code}: {resp.text[:200]}",
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("API dispatch to %s failed: %s", url, exc)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
|
||||
async def _dispatch_local(
|
||||
title: str,
|
||||
description: str = "",
|
||||
acceptance_criteria: list[str] | None = None,
|
||||
issue_number: int | None = None,
|
||||
) -> DispatchResult:
|
||||
"""Handle a task locally — Timmy processes it directly.
|
||||
|
||||
This is a lightweight stub. Real local execution should be wired
|
||||
into the agentic loop or a dedicated Timmy tool.
|
||||
|
||||
Args:
|
||||
title: Short task title.
|
||||
description: Task description.
|
||||
acceptance_criteria: Acceptance criteria list.
|
||||
issue_number: Optional Gitea issue number for logging.
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` with ASSIGNED status (local execution is
|
||||
assumed to succeed at dispatch time).
|
||||
"""
|
||||
task_type = infer_task_type(title, description)
|
||||
logger.info(
|
||||
"Timmy handling task locally: %r (issue #%s)", title[:60], issue_number
|
||||
)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=AgentType.TIMMY,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
metadata={"local": True, "description": description},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def dispatch_task(
|
||||
title: str,
|
||||
description: str = "",
|
||||
acceptance_criteria: list[str] | None = None,
|
||||
task_type: TaskType | None = None,
|
||||
agent: AgentType | None = None,
|
||||
issue_number: int | None = None,
|
||||
api_endpoint: str | None = None,
|
||||
max_retries: int = 1,
|
||||
) -> DispatchResult:
|
||||
"""Route a task to the best available agent.
|
||||
|
||||
This is the primary entry point. Callers can either specify the
|
||||
*agent* and *task_type* explicitly or let the dispatcher infer them
|
||||
from the *title* and *description*.
|
||||
|
||||
Args:
|
||||
title: Short human-readable task title.
|
||||
description: Full task description with context.
|
||||
acceptance_criteria: List of acceptance criteria strings.
|
||||
task_type: Override automatic task type inference.
|
||||
agent: Override automatic agent selection.
|
||||
issue_number: Gitea issue number to log the assignment on.
|
||||
api_endpoint: Override API endpoint for AGENT_API dispatches.
|
||||
max_retries: Number of retry attempts on failure (default 1).
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the final dispatch outcome.
|
||||
|
||||
Example::
|
||||
|
||||
result = await dispatch_task(
|
||||
issue_number=1072,
|
||||
title="Build the cascade LLM router",
|
||||
description="We need automatic failover...",
|
||||
acceptance_criteria=["Circuit breaker works", "Metrics exposed"],
|
||||
)
|
||||
if result.success:
|
||||
print(f"Assigned to {result.agent.value}")
|
||||
"""
|
||||
criteria = acceptance_criteria or []
|
||||
|
||||
if not title.strip():
|
||||
return DispatchResult(
|
||||
task_type=task_type or TaskType.ROUTINE_CODING,
|
||||
agent=agent or AgentType.TIMMY,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="`title` is required.",
|
||||
)
|
||||
|
||||
resolved_type = task_type or infer_task_type(title, description)
|
||||
resolved_agent = agent or select_agent(resolved_type)
|
||||
|
||||
logger.info(
|
||||
"Dispatching task %r → %s (type=%s, issue=#%s)",
|
||||
title[:60],
|
||||
resolved_agent.value,
|
||||
resolved_type.value,
|
||||
issue_number,
|
||||
)
|
||||
|
||||
spec = AGENT_REGISTRY[resolved_agent]
|
||||
|
||||
last_result: DispatchResult | None = None
|
||||
for attempt in range(max_retries + 1):
|
||||
if attempt > 0:
|
||||
logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60])
|
||||
|
||||
if spec.interface == "gitea" and issue_number is not None:
|
||||
result = await _dispatch_via_gitea(
|
||||
resolved_agent, issue_number, title, description, criteria
|
||||
)
|
||||
elif spec.interface == "api":
|
||||
result = await _dispatch_via_api(
|
||||
resolved_agent, title, description, criteria, issue_number, api_endpoint
|
||||
)
|
||||
else:
|
||||
result = await _dispatch_local(title, description, criteria, issue_number)
|
||||
|
||||
result.retry_count = attempt
|
||||
last_result = result
|
||||
|
||||
if result.success:
|
||||
return result
|
||||
|
||||
logger.warning(
|
||||
"Dispatch attempt %d failed for task %r: %s",
|
||||
attempt + 1,
|
||||
title[:60],
|
||||
result.error,
|
||||
)
|
||||
|
||||
# All attempts exhausted — escalate
|
||||
assert last_result is not None
|
||||
last_result.status = DispatchStatus.ESCALATED
|
||||
logger.error(
|
||||
"Task %r escalated after %d failed attempt(s): %s",
|
||||
title[:60],
|
||||
max_retries + 1,
|
||||
last_result.error,
|
||||
)
|
||||
|
||||
# Try to log the escalation on the issue
|
||||
if issue_number is not None:
|
||||
await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error")
|
||||
|
||||
return last_result
|
||||
|
||||
|
||||
async def _log_escalation(
|
||||
issue_number: int,
|
||||
agent: AgentType,
|
||||
error: str,
|
||||
) -> None:
|
||||
"""Post an escalation notice on the Gitea issue."""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
body = (
|
||||
f"## Dispatch Escalated\n\n"
|
||||
f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** "
|
||||
f"after {1} attempt(s).\n\n"
|
||||
f"**Error:** {error}\n\n"
|
||||
f"Manual intervention required.\n\n"
|
||||
f"---\n*Timmy agent dispatcher.*"
|
||||
)
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
await _post_gitea_comment(
|
||||
client, base_url, repo, headers, issue_number, body
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post escalation comment: %s", exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Monitoring helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def wait_for_completion(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 7200,
|
||||
) -> DispatchStatus:
|
||||
"""Block until the assigned Gitea issue is closed or the timeout fires.
|
||||
|
||||
Useful for synchronous orchestration where the caller wants to wait for
|
||||
the assigned agent to finish before proceeding.
|
||||
|
||||
Args:
|
||||
issue_number: Gitea issue to monitor.
|
||||
poll_interval: Seconds between status polls.
|
||||
max_wait: Maximum wait in seconds (default 2 hours).
|
||||
|
||||
Returns:
|
||||
:attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`.
|
||||
"""
|
||||
return await _poll_issue_completion(issue_number, poll_interval, max_wait)
|
||||
@@ -968,3 +968,195 @@ class TestCascadeRouterReload:
|
||||
|
||||
assert router.providers[0].name == "low-priority"
|
||||
assert router.providers[1].name == "high-priority"
|
||||
|
||||
|
||||
class TestComplexityRouting:
|
||||
"""Tests for Qwen3-8B / Qwen3-14B dual-model routing (issue #1065)."""
|
||||
|
||||
def _make_dual_model_provider(self) -> Provider:
|
||||
"""Build an Ollama provider with both Qwen3 models registered."""
|
||||
return Provider(
|
||||
name="ollama-local",
|
||||
type="ollama",
|
||||
enabled=True,
|
||||
priority=1,
|
||||
url="http://localhost:11434",
|
||||
models=[
|
||||
{
|
||||
"name": "qwen3:8b",
|
||||
"capabilities": ["text", "tools", "json", "streaming", "routine"],
|
||||
},
|
||||
{
|
||||
"name": "qwen3:14b",
|
||||
"default": True,
|
||||
"capabilities": ["text", "tools", "json", "streaming", "complex", "reasoning"],
|
||||
},
|
||||
],
|
||||
)
|
||||
|
||||
def test_get_model_for_complexity_simple_returns_8b(self):
|
||||
"""Simple tasks should select the model with 'routine' capability."""
|
||||
from infrastructure.router.classifier import TaskComplexity
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
provider = self._make_dual_model_provider()
|
||||
|
||||
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
|
||||
assert model == "qwen3:8b"
|
||||
|
||||
def test_get_model_for_complexity_complex_returns_14b(self):
|
||||
"""Complex tasks should select the model with 'complex' capability."""
|
||||
from infrastructure.router.classifier import TaskComplexity
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
provider = self._make_dual_model_provider()
|
||||
|
||||
model = router._get_model_for_complexity(provider, TaskComplexity.COMPLEX)
|
||||
assert model == "qwen3:14b"
|
||||
|
||||
def test_get_model_for_complexity_returns_none_when_no_match(self):
|
||||
"""Returns None when provider has no matching model in chain."""
|
||||
from infrastructure.router.classifier import TaskComplexity
|
||||
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {} # empty chains
|
||||
|
||||
provider = Provider(
|
||||
name="test",
|
||||
type="ollama",
|
||||
enabled=True,
|
||||
priority=1,
|
||||
models=[{"name": "llama3.2:3b", "default": True, "capabilities": ["text"]}],
|
||||
)
|
||||
|
||||
# No 'routine' or 'complex' model available
|
||||
model = router._get_model_for_complexity(provider, TaskComplexity.SIMPLE)
|
||||
assert model is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_complete_with_simple_hint_routes_to_8b(self):
|
||||
"""complexity_hint='simple' should use qwen3:8b."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
router.providers = [self._make_dual_model_provider()]
|
||||
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "fast answer", "model": "qwen3:8b"}
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "list tasks"}],
|
||||
complexity_hint="simple",
|
||||
)
|
||||
|
||||
assert result["model"] == "qwen3:8b"
|
||||
assert result["complexity"] == "simple"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_complete_with_complex_hint_routes_to_14b(self):
|
||||
"""complexity_hint='complex' should use qwen3:14b."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
router.providers = [self._make_dual_model_provider()]
|
||||
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "detailed answer", "model": "qwen3:14b"}
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "review this PR"}],
|
||||
complexity_hint="complex",
|
||||
)
|
||||
|
||||
assert result["model"] == "qwen3:14b"
|
||||
assert result["complexity"] == "complex"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_explicit_model_bypasses_complexity_routing(self):
|
||||
"""When model is explicitly provided, complexity routing is skipped."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
router.providers = [self._make_dual_model_provider()]
|
||||
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "response", "model": "qwen3:14b"}
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "list tasks"}],
|
||||
model="qwen3:14b", # explicit override
|
||||
)
|
||||
|
||||
# Explicit model wins — complexity field is None
|
||||
assert result["model"] == "qwen3:14b"
|
||||
assert result["complexity"] is None
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_classification_routes_simple_message(self):
|
||||
"""Short, simple messages should auto-classify as SIMPLE → 8B."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
router.providers = [self._make_dual_model_provider()]
|
||||
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "status"}],
|
||||
# no complexity_hint — auto-classify
|
||||
)
|
||||
|
||||
assert result["complexity"] == "simple"
|
||||
assert result["model"] == "qwen3:8b"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_auto_classification_routes_complex_message(self):
|
||||
"""Complex messages should auto-classify → 14B."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
router.providers = [self._make_dual_model_provider()]
|
||||
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "deep analysis", "model": "qwen3:14b"}
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "analyze and prioritize the backlog"}],
|
||||
)
|
||||
|
||||
assert result["complexity"] == "complex"
|
||||
assert result["model"] == "qwen3:14b"
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_invalid_complexity_hint_falls_back_to_auto(self):
|
||||
"""Invalid complexity_hint should log a warning and auto-classify."""
|
||||
router = CascadeRouter(config_path=Path("/nonexistent"))
|
||||
router.config.fallback_chains = {
|
||||
"routine": ["qwen3:8b"],
|
||||
"complex": ["qwen3:14b"],
|
||||
}
|
||||
router.providers = [self._make_dual_model_provider()]
|
||||
|
||||
with patch.object(router, "_call_ollama") as mock_call:
|
||||
mock_call.return_value = {"content": "ok", "model": "qwen3:8b"}
|
||||
# Should not raise
|
||||
result = await router.complete(
|
||||
messages=[{"role": "user", "content": "status"}],
|
||||
complexity_hint="INVALID_HINT",
|
||||
)
|
||||
|
||||
assert result["complexity"] in ("simple", "complex") # auto-classified
|
||||
|
||||
134
tests/infrastructure/test_router_classifier.py
Normal file
134
tests/infrastructure/test_router_classifier.py
Normal file
@@ -0,0 +1,134 @@
|
||||
"""Tests for Qwen3 dual-model task complexity classifier."""
|
||||
|
||||
import pytest
|
||||
|
||||
from infrastructure.router.classifier import TaskComplexity, classify_task
|
||||
|
||||
|
||||
class TestClassifyTask:
|
||||
"""Tests for classify_task heuristics."""
|
||||
|
||||
# ── Simple / routine tasks ──────────────────────────────────────────────
|
||||
|
||||
def test_empty_messages_is_simple(self):
|
||||
assert classify_task([]) == TaskComplexity.SIMPLE
|
||||
|
||||
def test_no_user_content_is_simple(self):
|
||||
messages = [{"role": "system", "content": "You are Timmy."}]
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
def test_short_status_query_is_simple(self):
|
||||
messages = [{"role": "user", "content": "status"}]
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
def test_list_command_is_simple(self):
|
||||
messages = [{"role": "user", "content": "list all tasks"}]
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
def test_get_command_is_simple(self):
|
||||
messages = [{"role": "user", "content": "get the latest log entry"}]
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
def test_short_message_under_threshold_is_simple(self):
|
||||
messages = [{"role": "user", "content": "run the build"}]
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
def test_affirmation_is_simple(self):
|
||||
messages = [{"role": "user", "content": "yes"}]
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
# ── Complex / quality-sensitive tasks ──────────────────────────────────
|
||||
|
||||
def test_plan_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "plan the sprint"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_review_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "review this code"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_analyze_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "analyze performance"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_triage_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "triage the open issues"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_refactor_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "refactor the auth module"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_explain_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "explain how the router works"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_prioritize_keyword_is_complex(self):
|
||||
messages = [{"role": "user", "content": "prioritize the backlog"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_long_message_is_complex(self):
|
||||
long_msg = "do something " * 50 # > 500 chars
|
||||
messages = [{"role": "user", "content": long_msg}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_numbered_list_is_complex(self):
|
||||
messages = [
|
||||
{
|
||||
"role": "user",
|
||||
"content": "1. Read the file 2. Analyze it 3. Write a report",
|
||||
}
|
||||
]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_code_block_is_complex(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "Here is the code:\n```python\nprint('hello')\n```"}
|
||||
]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_deep_conversation_is_complex(self):
|
||||
messages = [
|
||||
{"role": "user", "content": "hi"},
|
||||
{"role": "assistant", "content": "hello"},
|
||||
{"role": "user", "content": "ok"},
|
||||
{"role": "assistant", "content": "yes"},
|
||||
{"role": "user", "content": "ok"},
|
||||
{"role": "assistant", "content": "yes"},
|
||||
{"role": "user", "content": "now do the thing"},
|
||||
]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_analyse_british_spelling_is_complex(self):
|
||||
messages = [{"role": "user", "content": "analyse this dataset"}]
|
||||
assert classify_task(messages) == TaskComplexity.COMPLEX
|
||||
|
||||
def test_non_string_content_is_ignored(self):
|
||||
"""Non-string content should not crash the classifier."""
|
||||
messages = [{"role": "user", "content": ["part1", "part2"]}]
|
||||
# Should not raise; result doesn't matter — just must not blow up
|
||||
result = classify_task(messages)
|
||||
assert isinstance(result, TaskComplexity)
|
||||
|
||||
def test_system_message_not_counted_as_user(self):
|
||||
"""System message alone should not trigger complex keywords."""
|
||||
messages = [
|
||||
{"role": "system", "content": "analyze everything carefully"},
|
||||
{"role": "user", "content": "yes"},
|
||||
]
|
||||
# "analyze" is in system message (not user) — user says "yes" → simple
|
||||
assert classify_task(messages) == TaskComplexity.SIMPLE
|
||||
|
||||
|
||||
class TestTaskComplexityEnum:
|
||||
"""Tests for TaskComplexity enum values."""
|
||||
|
||||
def test_simple_value(self):
|
||||
assert TaskComplexity.SIMPLE.value == "simple"
|
||||
|
||||
def test_complex_value(self):
|
||||
assert TaskComplexity.COMPLEX.value == "complex"
|
||||
|
||||
def test_lookup_by_value(self):
|
||||
assert TaskComplexity("simple") == TaskComplexity.SIMPLE
|
||||
assert TaskComplexity("complex") == TaskComplexity.COMPLEX
|
||||
503
tests/timmy/test_dispatcher.py
Normal file
503
tests/timmy/test_dispatcher.py
Normal file
@@ -0,0 +1,503 @@
|
||||
"""Tests for the agent dispatcher (timmy.dispatcher)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.dispatcher import (
|
||||
AGENT_REGISTRY,
|
||||
AgentType,
|
||||
DispatchResult,
|
||||
DispatchStatus,
|
||||
TaskType,
|
||||
_dispatch_local,
|
||||
_dispatch_via_api,
|
||||
_dispatch_via_gitea,
|
||||
dispatch_task,
|
||||
infer_task_type,
|
||||
select_agent,
|
||||
wait_for_completion,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAgentRegistry:
|
||||
def test_all_agents_present(self):
|
||||
for member in AgentType:
|
||||
assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry"
|
||||
|
||||
def test_agent_specs_have_display_names(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
assert spec.display_name, f"{agent} has empty display_name"
|
||||
|
||||
def test_gitea_agents_have_labels(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
if spec.interface == "gitea":
|
||||
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
|
||||
|
||||
def test_non_gitea_agents_have_no_labels(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
if spec.interface not in ("gitea",):
|
||||
# api and local agents may have no label
|
||||
assert spec.gitea_label is None or spec.interface == "gitea"
|
||||
|
||||
def test_max_concurrent_positive(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# select_agent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSelectAgent:
|
||||
def test_architecture_routes_to_claude(self):
|
||||
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_refactoring_routes_to_claude(self):
|
||||
assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_code_review_routes_to_claude(self):
|
||||
assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_routine_coding_routes_to_kimi(self):
|
||||
assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE
|
||||
|
||||
def test_fast_iteration_routes_to_kimi(self):
|
||||
assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE
|
||||
|
||||
def test_research_routes_to_agent_api(self):
|
||||
assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API
|
||||
|
||||
def test_triage_routes_to_timmy(self):
|
||||
assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY
|
||||
|
||||
def test_planning_routes_to_timmy(self):
|
||||
assert select_agent(TaskType.PLANNING) == AgentType.TIMMY
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# infer_task_type
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInferTaskType:
|
||||
def test_architecture_keyword(self):
|
||||
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
|
||||
|
||||
def test_refactor_keyword(self):
|
||||
assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING
|
||||
|
||||
def test_code_review_keyword(self):
|
||||
assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW
|
||||
|
||||
def test_research_keyword(self):
|
||||
assert infer_task_type("Research embedding models") == TaskType.RESEARCH
|
||||
|
||||
def test_triage_keyword(self):
|
||||
assert infer_task_type("Triage open issues") == TaskType.TRIAGE
|
||||
|
||||
def test_planning_keyword(self):
|
||||
assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING
|
||||
|
||||
def test_fallback_returns_routine_coding(self):
|
||||
assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING
|
||||
|
||||
def test_description_contributes_to_inference(self):
|
||||
result = infer_task_type("Implement feature", "We need to refactor the old code")
|
||||
assert result == TaskType.REFACTORING
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DispatchResult
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchResult:
|
||||
def test_success_when_assigned(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
)
|
||||
assert r.success is True
|
||||
|
||||
def test_success_when_completed(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.COMPLETED,
|
||||
)
|
||||
assert r.success is True
|
||||
|
||||
def test_not_success_when_failed(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.FAILED,
|
||||
)
|
||||
assert r.success is False
|
||||
|
||||
def test_not_success_when_escalated(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ESCALATED,
|
||||
)
|
||||
assert r.success is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_local
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchLocal:
|
||||
async def test_returns_assigned(self):
|
||||
result = await _dispatch_local(
|
||||
title="Plan the migration",
|
||||
description="We need a plan.",
|
||||
acceptance_criteria=["Plan is documented"],
|
||||
issue_number=42,
|
||||
)
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
assert result.agent == AgentType.TIMMY
|
||||
assert result.issue_number == 42
|
||||
|
||||
async def test_infers_task_type(self):
|
||||
result = await _dispatch_local(
|
||||
title="Plan the sprint",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.task_type == TaskType.PLANNING
|
||||
|
||||
async def test_no_issue_number(self):
|
||||
result = await _dispatch_local(title="Do something", description="")
|
||||
assert result.issue_number is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_via_api
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchViaApi:
|
||||
async def test_no_endpoint_returns_failed(self):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "No API endpoint" in (result.error or "")
|
||||
|
||||
async def test_successful_api_call(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 202
|
||||
mock_resp.content = b'{"ok": true}'
|
||||
mock_resp.json.return_value = {"ok": True}
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_client):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="Look at the logs",
|
||||
acceptance_criteria=["Report produced"],
|
||||
endpoint="http://fake-agent/dispatch",
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
assert result.agent == AgentType.AGENT_API
|
||||
|
||||
async def test_api_error_returns_failed(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 500
|
||||
mock_resp.text = "Internal Server Error"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_client):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
endpoint="http://fake-agent/dispatch",
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "500" in (result.error or "")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_via_gitea
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_GITEA_SETTINGS = MagicMock(
|
||||
gitea_enabled=True,
|
||||
gitea_token="test-token",
|
||||
gitea_url="http://gitea.test",
|
||||
gitea_repo="owner/repo",
|
||||
)
|
||||
|
||||
|
||||
class TestDispatchViaGitea:
|
||||
def _make_client(self, label_list=None, label_create_status=201, comment_status=201):
|
||||
"""Build a mock httpx.AsyncClient for Gitea interactions."""
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = label_list or []
|
||||
|
||||
create_label_resp = MagicMock()
|
||||
create_label_resp.status_code = label_create_status
|
||||
create_label_resp.json.return_value = {"id": 99}
|
||||
|
||||
apply_label_resp = MagicMock()
|
||||
apply_label_resp.status_code = 201
|
||||
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = comment_status
|
||||
comment_resp.json.return_value = {"id": 7}
|
||||
|
||||
client = AsyncMock()
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=False)
|
||||
client.get = AsyncMock(return_value=label_resp)
|
||||
client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp])
|
||||
return client
|
||||
|
||||
async def test_successful_gitea_dispatch(self):
|
||||
client = self._make_client()
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1072,
|
||||
title="Design the router",
|
||||
description="We need a cascade router.",
|
||||
acceptance_criteria=["Failover works"],
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert result.agent == AgentType.CLAUDE_CODE
|
||||
assert result.issue_number == 1072
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
|
||||
async def test_no_gitea_token_returns_failed(self):
|
||||
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
|
||||
with patch("timmy.dispatcher.settings", bad_settings):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
title="Some task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "not configured" in (result.error or "").lower()
|
||||
|
||||
async def test_gitea_disabled_returns_failed(self):
|
||||
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
|
||||
with patch("timmy.dispatcher.settings", bad_settings):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
title="Some task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
|
||||
async def test_existing_label_reused(self):
|
||||
"""When the label already exists, it should be reused (no creation call)."""
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = [{"name": "claude-ready", "id": 55}]
|
||||
|
||||
apply_resp = MagicMock()
|
||||
apply_resp.status_code = 201
|
||||
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = 201
|
||||
comment_resp.json.return_value = {"id": 8}
|
||||
|
||||
client = AsyncMock()
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=False)
|
||||
client.get = AsyncMock(return_value=label_resp)
|
||||
client.post = AsyncMock(side_effect=[apply_resp, comment_resp])
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=10,
|
||||
title="Architecture task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
|
||||
assert result.success
|
||||
# Should only have 2 POST calls: apply label + comment (no label creation)
|
||||
assert client.post.call_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dispatch_task (integration-style)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchTask:
|
||||
async def test_empty_title_returns_failed(self):
|
||||
result = await dispatch_task(title=" ")
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "`title` is required" in (result.error or "")
|
||||
|
||||
async def test_local_dispatch_for_timmy_task(self):
|
||||
result = await dispatch_task(
|
||||
title="Triage the open issues",
|
||||
description="We have 40 open issues.",
|
||||
acceptance_criteria=["Issues are labelled"],
|
||||
task_type=TaskType.TRIAGE,
|
||||
)
|
||||
assert result.agent == AgentType.TIMMY
|
||||
assert result.success
|
||||
|
||||
async def test_explicit_agent_override(self):
|
||||
"""Caller can force a specific agent regardless of task type."""
|
||||
result = await dispatch_task(
|
||||
title="Triage the open issues",
|
||||
agent=AgentType.TIMMY,
|
||||
)
|
||||
assert result.agent == AgentType.TIMMY
|
||||
|
||||
async def test_gitea_dispatch_when_issue_provided(self):
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
|
||||
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
|
||||
apply_resp = MagicMock(status_code=201)
|
||||
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
|
||||
client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp])
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await dispatch_task(
|
||||
title="Design the cascade router",
|
||||
description="Architecture task.",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1072,
|
||||
)
|
||||
|
||||
assert result.agent == AgentType.CLAUDE_CODE
|
||||
assert result.success
|
||||
|
||||
async def test_escalation_after_max_retries(self):
|
||||
"""If all attempts fail, the result is ESCALATED."""
|
||||
with (
|
||||
patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch,
|
||||
patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock),
|
||||
):
|
||||
mock_dispatch.return_value = DispatchResult(
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Gitea offline",
|
||||
)
|
||||
result = await dispatch_task(
|
||||
title="Design router",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.ESCALATED
|
||||
assert mock_dispatch.call_count == 2 # initial + 1 retry
|
||||
|
||||
async def test_no_retry_on_success(self):
|
||||
with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch:
|
||||
mock_dispatch.return_value = DispatchResult(
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
comment_id=42,
|
||||
label_applied="claude-ready",
|
||||
)
|
||||
result = await dispatch_task(
|
||||
title="Design router",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1,
|
||||
max_retries=2,
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert mock_dispatch.call_count == 1 # no retries needed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# wait_for_completion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWaitForCompletion:
|
||||
async def test_returns_completed_when_issue_closed(self):
|
||||
closed_resp = MagicMock(
|
||||
status_code=200,
|
||||
json=MagicMock(return_value={"state": "closed"}),
|
||||
)
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=closed_resp)
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5)
|
||||
|
||||
assert status == DispatchStatus.COMPLETED
|
||||
|
||||
async def test_returns_timed_out_when_still_open(self):
|
||||
open_resp = MagicMock(
|
||||
status_code=200,
|
||||
json=MagicMock(return_value={"state": "open"}),
|
||||
)
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=open_resp)
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
patch("asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2)
|
||||
|
||||
assert status == DispatchStatus.TIMED_OUT
|
||||
6
tox.ini
6
tox.ini
@@ -47,12 +47,10 @@ commands =
|
||||
# ── Test Environments ────────────────────────────────────────────────────────
|
||||
|
||||
[testenv:unit]
|
||||
description = Fast tests — excludes e2e, functional, and external services
|
||||
description = Fast unit tests — only tests marked @pytest.mark.unit
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
--ignore=tests/e2e \
|
||||
--ignore=tests/functional \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
-m "unit and not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
-n auto --dist worksteal
|
||||
|
||||
[testenv:integration]
|
||||
|
||||
Reference in New Issue
Block a user