Compare commits
6 Commits
gemini/iss
...
gemini/iss
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f79899e283 | ||
| 128aa4427f | |||
| 4f8e86348c | |||
| 0c627f175b | |||
| cf82bb0be4 | |||
| e492a51510 |
@@ -304,6 +304,16 @@ class Settings(BaseSettings):
|
||||
mcp_timeout: int = 15
|
||||
mcp_bridge_timeout: int = 60 # HTTP timeout for MCP bridge Ollama calls (seconds)
|
||||
|
||||
# ── Backlog Triage Loop ────────────────────────────────────────────
|
||||
# Autonomous loop: fetch open issues, score, assign to agents.
|
||||
backlog_triage_enabled: bool = False
|
||||
# Seconds between triage cycles (default: 15 minutes).
|
||||
backlog_triage_interval_seconds: int = 900
|
||||
# When True, score and summarize but don't write to Gitea.
|
||||
backlog_triage_dry_run: bool = False
|
||||
# Create a daily triage summary issue/comment.
|
||||
backlog_triage_daily_summary: bool = True
|
||||
|
||||
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
|
||||
# Self-test orchestrator that probes capabilities alongside the thinking loop.
|
||||
loop_qa_enabled: bool = True
|
||||
@@ -311,6 +321,15 @@ class Settings(BaseSettings):
|
||||
loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task
|
||||
loop_qa_max_per_hour: int = 12 # safety throttle
|
||||
|
||||
# ── Vassal Protocol (Autonomous Orchestrator) ─────────────────────
|
||||
# Timmy as lead decision-maker: triage backlog, dispatch agents, monitor health.
|
||||
# See timmy/vassal/ for implementation.
|
||||
vassal_enabled: bool = False # off by default — enable when Qwen3-14B is loaded
|
||||
vassal_cycle_interval: int = 300 # seconds between orchestration cycles (5 min)
|
||||
vassal_max_dispatch_per_cycle: int = 10 # cap on new dispatches per cycle
|
||||
vassal_stuck_threshold_minutes: int = 120 # minutes before agent issue is "stuck"
|
||||
vassal_idle_threshold_minutes: int = 30 # minutes before agent is "idle"
|
||||
|
||||
# ── Paperclip AI — orchestration bridge ────────────────────────────
|
||||
# URL where the Paperclip server listens.
|
||||
# For VPS deployment behind nginx, use the public domain.
|
||||
|
||||
759
src/timmy/backlog_triage.py
Normal file
759
src/timmy/backlog_triage.py
Normal file
@@ -0,0 +1,759 @@
|
||||
"""Autonomous backlog triage loop — Timmy scans Gitea and assigns work.
|
||||
|
||||
Continuously fetches open issues, scores/prioritizes them, and decides
|
||||
what to work on next without waiting to be asked.
|
||||
|
||||
Loop flow::
|
||||
|
||||
while true:
|
||||
1. Fetch all open issues from Gitea API
|
||||
2. Score/prioritize by labels, age, type, blocked status
|
||||
3. Identify unassigned high-priority items
|
||||
4. Decide: assign to claude, dispatch to kimi, or flag for Alex
|
||||
5. Execute the assignment (comment + assign)
|
||||
6. Optionally post a daily triage summary
|
||||
7. Sleep for configurable interval (default 15 min)
|
||||
|
||||
Priority tiers:
|
||||
P0 — security, data loss, blocking bugs → immediate action
|
||||
P1 — core functionality, ready issues → next sprint
|
||||
P2 — improvements, low-score issues → backlog
|
||||
P3 — philosophy, meta → someday/never (skip in triage)
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.backlog_triage import BacklogTriageLoop
|
||||
|
||||
loop = BacklogTriageLoop()
|
||||
await loop.run_once() # single triage cycle
|
||||
await loop.start() # background daemon loop
|
||||
loop.stop() # graceful shutdown
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import re
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ── Constants ────────────────────────────────────────────────────────────────
|
||||
|
||||
# Minimum triage score to be considered "ready" for assignment
|
||||
READY_THRESHOLD = 5
|
||||
|
||||
# Agent Gitea logins
|
||||
AGENT_CLAUDE = "claude"
|
||||
AGENT_KIMI = "kimi"
|
||||
OWNER_LOGIN = "rockachopa" # Alex — human owner
|
||||
|
||||
# Labels
|
||||
KIMI_READY_LABEL = "kimi-ready"
|
||||
TRIAGE_DONE_LABEL = "triage-done"
|
||||
|
||||
# Tag sets (mirrors scripts/triage_score.py)
|
||||
_BUG_TAGS = frozenset({"bug", "broken", "crash", "error", "fix", "regression", "hotfix"})
|
||||
_FEATURE_TAGS = frozenset({"feature", "feat", "enhancement", "capability", "timmy-capability"})
|
||||
_REFACTOR_TAGS = frozenset({"refactor", "cleanup", "tech-debt", "optimization", "perf"})
|
||||
_META_TAGS = frozenset({"philosophy", "soul-gap", "discussion", "question", "rfc"})
|
||||
_P0_TAGS = frozenset({"security", "data-loss", "blocking", "p0", "critical"})
|
||||
_RESEARCH_TAGS = frozenset({"research", "kimi-ready", "investigation", "spike"})
|
||||
_LOOP_TAG = "loop-generated"
|
||||
|
||||
# Regex patterns for scoring
|
||||
_TAG_RE = re.compile(r"\[([^\]]+)\]")
|
||||
_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE)
|
||||
_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE)
|
||||
_ACCEPT_RE = re.compile(
|
||||
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
|
||||
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
|
||||
re.IGNORECASE,
|
||||
)
|
||||
_TEST_RE = re.compile(r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE)
|
||||
_BLOCKED_RE = re.compile(r"\bblock(?:ed|s|ing)\b", re.IGNORECASE)
|
||||
|
||||
|
||||
# ── Data types ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
@dataclass
|
||||
class ScoredIssue:
|
||||
"""A Gitea issue enriched with triage scoring."""
|
||||
|
||||
number: int
|
||||
title: str
|
||||
body: str
|
||||
labels: list[str]
|
||||
tags: set[str]
|
||||
assignees: list[str]
|
||||
created_at: datetime
|
||||
issue_type: str # bug | feature | refactor | philosophy | research | unknown
|
||||
|
||||
score: int = 0
|
||||
scope: int = 0
|
||||
acceptance: int = 0
|
||||
alignment: int = 0
|
||||
ready: bool = False
|
||||
age_days: int = 0
|
||||
is_p0: bool = False
|
||||
is_blocked: bool = False
|
||||
|
||||
@property
|
||||
def is_unassigned(self) -> bool:
|
||||
return len(self.assignees) == 0
|
||||
|
||||
@property
|
||||
def needs_kimi(self) -> bool:
|
||||
return bool(self.tags & _RESEARCH_TAGS) or KIMI_READY_LABEL in self.labels
|
||||
|
||||
|
||||
@dataclass
|
||||
class TriageDecision:
|
||||
"""The outcome of a triage decision for a single issue."""
|
||||
|
||||
issue_number: int
|
||||
action: str # "assign_claude" | "assign_kimi" | "flag_alex" | "skip"
|
||||
reason: str
|
||||
agent: str = "" # the agent assigned (login)
|
||||
executed: bool = False
|
||||
error: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class TriageCycleResult:
|
||||
"""Summary of one complete triage cycle."""
|
||||
|
||||
timestamp: str
|
||||
total_open: int
|
||||
scored: int
|
||||
ready: int
|
||||
decisions: list[TriageDecision] = field(default_factory=list)
|
||||
errors: list[str] = field(default_factory=list)
|
||||
duration_ms: int = 0
|
||||
|
||||
|
||||
# ── Scoring ──────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _extract_tags(title: str, labels: list[str]) -> set[str]:
|
||||
"""Pull tags from [bracket] title notation + Gitea label names."""
|
||||
tags: set[str] = set()
|
||||
for m in _TAG_RE.finditer(title):
|
||||
tags.add(m.group(1).lower().strip())
|
||||
for lbl in labels:
|
||||
tags.add(lbl.lower().strip())
|
||||
return tags
|
||||
|
||||
|
||||
def _score_scope(title: str, body: str, tags: set[str]) -> int:
|
||||
"""0–3: How well-scoped is this issue?"""
|
||||
text = f"{title}\n{body}"
|
||||
score = 0
|
||||
if _FILE_RE.search(text):
|
||||
score += 1
|
||||
if _FUNC_RE.search(text):
|
||||
score += 1
|
||||
clean = _TAG_RE.sub("", title).strip()
|
||||
if len(clean) < 80:
|
||||
score += 1
|
||||
if tags & _META_TAGS:
|
||||
score = max(0, score - 2)
|
||||
return min(3, score)
|
||||
|
||||
|
||||
def _score_acceptance(title: str, body: str, tags: set[str]) -> int:
|
||||
"""0–3: Does this have clear acceptance criteria?"""
|
||||
text = f"{title}\n{body}"
|
||||
score = 0
|
||||
matches = len(_ACCEPT_RE.findall(text))
|
||||
if matches >= 3:
|
||||
score += 2
|
||||
elif matches >= 1:
|
||||
score += 1
|
||||
if _TEST_RE.search(text):
|
||||
score += 1
|
||||
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
|
||||
score += 1
|
||||
if tags & _META_TAGS:
|
||||
score = max(0, score - 1)
|
||||
return min(3, score)
|
||||
|
||||
|
||||
def _score_alignment(title: str, body: str, tags: set[str]) -> int:
|
||||
"""0–3: How aligned is this with the north star?"""
|
||||
score = 0
|
||||
if tags & _BUG_TAGS:
|
||||
return 3
|
||||
if tags & _REFACTOR_TAGS:
|
||||
score += 2
|
||||
if tags & _FEATURE_TAGS:
|
||||
score += 2
|
||||
if _LOOP_TAG in tags:
|
||||
score += 1
|
||||
if tags & _META_TAGS:
|
||||
score = 0
|
||||
return min(3, score)
|
||||
|
||||
|
||||
def score_issue(issue: dict[str, Any]) -> ScoredIssue:
|
||||
"""Score and classify a raw Gitea issue dict."""
|
||||
number = issue["number"]
|
||||
title = issue.get("title", "")
|
||||
body = issue.get("body") or ""
|
||||
label_names = [lbl["name"] for lbl in issue.get("labels", [])]
|
||||
tags = _extract_tags(title, label_names)
|
||||
assignees = [a["login"] for a in issue.get("assignees", [])]
|
||||
|
||||
# Parse created_at
|
||||
raw_ts = issue.get("created_at", "")
|
||||
try:
|
||||
created_at = datetime.fromisoformat(raw_ts.replace("Z", "+00:00"))
|
||||
except (ValueError, AttributeError):
|
||||
created_at = datetime.now(UTC)
|
||||
age_days = (datetime.now(UTC) - created_at).days
|
||||
|
||||
# Scores
|
||||
scope = _score_scope(title, body, tags)
|
||||
acceptance = _score_acceptance(title, body, tags)
|
||||
alignment = _score_alignment(title, body, tags)
|
||||
total = scope + acceptance + alignment
|
||||
|
||||
# Classify
|
||||
if tags & _BUG_TAGS:
|
||||
issue_type = "bug"
|
||||
elif tags & _RESEARCH_TAGS:
|
||||
issue_type = "research"
|
||||
elif tags & _FEATURE_TAGS:
|
||||
issue_type = "feature"
|
||||
elif tags & _REFACTOR_TAGS:
|
||||
issue_type = "refactor"
|
||||
elif tags & _META_TAGS:
|
||||
issue_type = "philosophy"
|
||||
else:
|
||||
issue_type = "unknown"
|
||||
|
||||
is_p0 = bool(tags & _P0_TAGS) or issue_type == "bug"
|
||||
is_blocked = bool(_BLOCKED_RE.search(title) or _BLOCKED_RE.search(body))
|
||||
|
||||
return ScoredIssue(
|
||||
number=number,
|
||||
title=_TAG_RE.sub("", title).strip(),
|
||||
body=body,
|
||||
labels=label_names,
|
||||
tags=tags,
|
||||
assignees=assignees,
|
||||
created_at=created_at,
|
||||
issue_type=issue_type,
|
||||
score=total,
|
||||
scope=scope,
|
||||
acceptance=acceptance,
|
||||
alignment=alignment,
|
||||
ready=total >= READY_THRESHOLD,
|
||||
age_days=age_days,
|
||||
is_p0=is_p0,
|
||||
is_blocked=is_blocked,
|
||||
)
|
||||
|
||||
|
||||
# ── Decision logic ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def decide(issue: ScoredIssue) -> TriageDecision:
|
||||
"""Decide what to do with an issue.
|
||||
|
||||
Returns a TriageDecision with action, reason, and agent.
|
||||
Decision is not yet executed — call execute_decision() for that.
|
||||
"""
|
||||
num = issue.number
|
||||
|
||||
# Skip philosophy/meta — not dev-actionable
|
||||
if issue.issue_type == "philosophy":
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="skip",
|
||||
reason="Philosophy/meta issue — not dev-actionable in the triage loop.",
|
||||
)
|
||||
|
||||
# Skip already-assigned issues
|
||||
if not issue.is_unassigned:
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="skip",
|
||||
reason=f"Already assigned to: {', '.join(issue.assignees)}.",
|
||||
)
|
||||
|
||||
# Skip if not ready (low score)
|
||||
if not issue.ready:
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="skip",
|
||||
reason=f"Score {issue.score} < {READY_THRESHOLD} threshold — needs more detail before assignment.",
|
||||
)
|
||||
|
||||
# Blocked: flag for Alex
|
||||
if issue.is_blocked:
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="flag_alex",
|
||||
agent=OWNER_LOGIN,
|
||||
reason=(
|
||||
"Issue appears blocked. Flagging for @rockachopa to unblock before autonomous assignment."
|
||||
),
|
||||
)
|
||||
|
||||
# Research / Kimi-ready
|
||||
if issue.needs_kimi:
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="assign_kimi",
|
||||
agent=AGENT_KIMI,
|
||||
reason=(
|
||||
f"Issue type '{issue.issue_type}' with research/investigation scope. "
|
||||
f"Assigning kimi-ready label for Kimi agent to pick up."
|
||||
),
|
||||
)
|
||||
|
||||
# P0 bugs and blocking issues → Claude immediately
|
||||
if issue.is_p0:
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="assign_claude",
|
||||
agent=AGENT_CLAUDE,
|
||||
reason=(
|
||||
f"P0/{issue.issue_type} issue (score={issue.score}, age={issue.age_days}d). "
|
||||
f"Assigning to Claude Code for immediate attention."
|
||||
),
|
||||
)
|
||||
|
||||
# Everything else that is ready → Claude Code
|
||||
return TriageDecision(
|
||||
issue_number=num,
|
||||
action="assign_claude",
|
||||
agent=AGENT_CLAUDE,
|
||||
reason=(
|
||||
f"Unassigned ready issue (type={issue.issue_type}, score={issue.score}, "
|
||||
f"age={issue.age_days}d). Assigning to Claude Code."
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
# ── Gitea API client ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _api_headers() -> dict[str, str]:
|
||||
return {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
"Accept": "application/json",
|
||||
}
|
||||
|
||||
|
||||
def _repo_url(path: str) -> str:
|
||||
owner, repo = settings.gitea_repo.split("/", 1)
|
||||
return f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/{path}"
|
||||
|
||||
|
||||
async def fetch_open_issues(client: httpx.AsyncClient) -> list[dict[str, Any]]:
|
||||
"""Fetch all open issues from Gitea, paginating as needed."""
|
||||
all_issues: list[dict[str, Any]] = []
|
||||
page = 1
|
||||
while True:
|
||||
url = _repo_url(f"issues?state=open&type=issues&limit=50&page={page}")
|
||||
try:
|
||||
resp = await client.get(url, headers=_api_headers())
|
||||
if resp.status_code != 200:
|
||||
logger.warning("Gitea issues fetch failed (HTTP %s)", resp.status_code)
|
||||
break
|
||||
batch: list[dict[str, Any]] = resp.json()
|
||||
if not batch:
|
||||
break
|
||||
all_issues.extend(batch)
|
||||
if len(batch) < 50:
|
||||
break
|
||||
page += 1
|
||||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||||
logger.warning("Gitea connection error fetching issues: %s", exc)
|
||||
break
|
||||
return all_issues
|
||||
|
||||
|
||||
async def post_comment(
|
||||
client: httpx.AsyncClient,
|
||||
issue_number: int,
|
||||
body: str,
|
||||
) -> bool:
|
||||
"""Post a comment on a Gitea issue. Returns True on success."""
|
||||
url = _repo_url(f"issues/{issue_number}/comments")
|
||||
try:
|
||||
resp = await client.post(url, headers=_api_headers(), json={"body": body})
|
||||
return resp.status_code in (200, 201)
|
||||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||||
logger.warning("Failed to post comment on #%d: %s", issue_number, exc)
|
||||
return False
|
||||
|
||||
|
||||
async def assign_issue(
|
||||
client: httpx.AsyncClient,
|
||||
issue_number: int,
|
||||
assignee: str,
|
||||
) -> bool:
|
||||
"""Assign an issue to a Gitea user. Returns True on success."""
|
||||
url = _repo_url(f"issues/{issue_number}")
|
||||
try:
|
||||
resp = await client.patch(
|
||||
url,
|
||||
headers=_api_headers(),
|
||||
json={"assignees": [assignee]},
|
||||
)
|
||||
return resp.status_code in (200, 201)
|
||||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||||
logger.warning("Failed to assign #%d to %s: %s", issue_number, assignee, exc)
|
||||
return False
|
||||
|
||||
|
||||
async def add_label(
|
||||
client: httpx.AsyncClient,
|
||||
issue_number: int,
|
||||
label_name: str,
|
||||
) -> bool:
|
||||
"""Add a label to a Gitea issue by name (auto-creates if missing). Returns True on success."""
|
||||
owner, repo = settings.gitea_repo.split("/", 1)
|
||||
labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels"
|
||||
headers = _api_headers()
|
||||
|
||||
try:
|
||||
# Fetch existing labels
|
||||
resp = await client.get(labels_url, headers=headers)
|
||||
if resp.status_code != 200:
|
||||
return False
|
||||
existing = {lbl["name"]: lbl["id"] for lbl in resp.json()}
|
||||
|
||||
if label_name in existing:
|
||||
label_id = existing[label_name]
|
||||
else:
|
||||
# Auto-create the label
|
||||
create_resp = await client.post(
|
||||
labels_url,
|
||||
headers=headers,
|
||||
json={"name": label_name, "color": "#006b75"},
|
||||
)
|
||||
if create_resp.status_code not in (200, 201):
|
||||
return False
|
||||
label_id = create_resp.json()["id"]
|
||||
|
||||
# Apply to the issue
|
||||
apply_url = _repo_url(f"issues/{issue_number}/labels")
|
||||
apply_resp = await client.post(
|
||||
apply_url, headers=headers, json={"labels": [label_id]}
|
||||
)
|
||||
return apply_resp.status_code in (200, 201)
|
||||
|
||||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||||
logger.warning("Failed to add label %r to #%d: %s", label_name, issue_number, exc)
|
||||
return False
|
||||
|
||||
|
||||
# ── Decision execution ───────────────────────────────────────────────────────
|
||||
|
||||
|
||||
async def execute_decision(
|
||||
client: httpx.AsyncClient,
|
||||
decision: TriageDecision,
|
||||
dry_run: bool = False,
|
||||
) -> TriageDecision:
|
||||
"""Execute a triage decision — comment + assign/label.
|
||||
|
||||
When dry_run=True, logs the decision but makes no Gitea API calls.
|
||||
Returns the updated decision with executed=True on success.
|
||||
"""
|
||||
num = decision.issue_number
|
||||
|
||||
if decision.action == "skip":
|
||||
logger.debug("Triage skip #%d: %s", num, decision.reason)
|
||||
decision.executed = True
|
||||
return decision
|
||||
|
||||
audit_comment = _build_audit_comment(decision)
|
||||
|
||||
if dry_run:
|
||||
logger.info(
|
||||
"[DRY RUN] #%d → %s (%s): %s",
|
||||
num,
|
||||
decision.action,
|
||||
decision.agent,
|
||||
decision.reason,
|
||||
)
|
||||
decision.executed = True
|
||||
return decision
|
||||
|
||||
# Post audit comment first (always, so Alex can see reasoning)
|
||||
comment_ok = await post_comment(client, num, audit_comment)
|
||||
if not comment_ok:
|
||||
decision.error = "Failed to post audit comment"
|
||||
logger.warning("Triage #%d: comment failed", num)
|
||||
return decision
|
||||
|
||||
# Execute assignment
|
||||
ok = False
|
||||
if decision.action == "assign_claude":
|
||||
ok = await assign_issue(client, num, AGENT_CLAUDE)
|
||||
elif decision.action == "assign_kimi":
|
||||
ok = await add_label(client, num, KIMI_READY_LABEL)
|
||||
elif decision.action == "flag_alex":
|
||||
# Comment already posted above — that's sufficient for flagging
|
||||
ok = True
|
||||
|
||||
if ok:
|
||||
decision.executed = True
|
||||
logger.info("Triage #%d → %s OK", num, decision.action)
|
||||
else:
|
||||
decision.error = f"Action {decision.action!r} failed"
|
||||
logger.warning("Triage #%d: action %r failed", num, decision.action)
|
||||
|
||||
return decision
|
||||
|
||||
|
||||
def _build_audit_comment(decision: TriageDecision) -> str:
|
||||
"""Build the audit trail comment that Alex can read to see reasoning."""
|
||||
ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
|
||||
action_text = {
|
||||
"assign_claude": f"Assigning to @{AGENT_CLAUDE} for implementation.",
|
||||
"assign_kimi": f"Adding `{KIMI_READY_LABEL}` label — queuing for Kimi research agent.",
|
||||
"flag_alex": f"Flagging for @{OWNER_LOGIN} — issue appears blocked or needs human decision.",
|
||||
}.get(decision.action, decision.action)
|
||||
|
||||
return (
|
||||
f"**[Timmy Triage — {ts}]**\n\n"
|
||||
f"**Decision:** {action_text}\n\n"
|
||||
f"**Why:** {decision.reason}\n\n"
|
||||
f"*Autonomous triage by Timmy. Reply to override.*"
|
||||
)
|
||||
|
||||
|
||||
# ── Daily summary ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _build_daily_summary(result: TriageCycleResult, scored: list[ScoredIssue]) -> str:
|
||||
"""Build the daily triage summary body."""
|
||||
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
|
||||
assigned = [d for d in result.decisions if d.executed and d.action != "skip"]
|
||||
skipped = [d for d in result.decisions if d.action == "skip"]
|
||||
|
||||
lines = [
|
||||
f"# Timmy Backlog Triage — {now}",
|
||||
"",
|
||||
f"**Open issues:** {result.total_open} | "
|
||||
f"**Scored:** {result.scored} | "
|
||||
f"**Ready:** {result.ready} | "
|
||||
f"**Assigned this cycle:** {len(assigned)}",
|
||||
"",
|
||||
"## Top 10 Ready Issues (by score)",
|
||||
"",
|
||||
]
|
||||
|
||||
top = sorted([s for s in scored if s.ready], key=lambda s: (-s.score, s.number))[:10]
|
||||
for s in top:
|
||||
flag = "🐛" if s.issue_type == "bug" else "⚡" if s.is_p0 else "✦"
|
||||
lines.append(
|
||||
f"- {flag} **#{s.number}** (score={s.score}, age={s.age_days}d) — {s.title[:80]}"
|
||||
)
|
||||
|
||||
if assigned:
|
||||
lines += ["", "## Actions Taken", ""]
|
||||
for d in assigned:
|
||||
lines.append(f"- #{d.issue_number} → `{d.action}` ({d.agent}): {d.reason[:100]}")
|
||||
|
||||
if skipped:
|
||||
lines += ["", f"## Skipped ({len(skipped)} issues)", ""]
|
||||
for d in skipped[:5]:
|
||||
lines.append(f"- #{d.issue_number}: {d.reason[:80]}")
|
||||
if len(skipped) > 5:
|
||||
lines.append(f"- … and {len(skipped) - 5} more")
|
||||
|
||||
lines += [
|
||||
"",
|
||||
"---",
|
||||
"*Auto-generated by Timmy's backlog triage loop. "
|
||||
"Override any decision by reassigning or commenting.*",
|
||||
]
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
async def post_daily_summary(
|
||||
client: httpx.AsyncClient,
|
||||
result: TriageCycleResult,
|
||||
scored: list[ScoredIssue],
|
||||
dry_run: bool = False,
|
||||
) -> bool:
|
||||
"""Post a daily triage summary as a new Gitea issue."""
|
||||
today = datetime.now(UTC).strftime("%Y-%m-%d")
|
||||
title = f"[Triage] Daily backlog summary — {today}"
|
||||
body = _build_daily_summary(result, scored)
|
||||
|
||||
if dry_run:
|
||||
logger.info("[DRY RUN] Would post daily summary: %s", title)
|
||||
return True
|
||||
|
||||
url = _repo_url("issues")
|
||||
try:
|
||||
resp = await client.post(
|
||||
url,
|
||||
headers=_api_headers(),
|
||||
json={
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [],
|
||||
},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
issue_num = resp.json().get("number", "?")
|
||||
logger.info("Daily triage summary posted as issue #%s", issue_num)
|
||||
return True
|
||||
logger.warning("Daily summary post failed (HTTP %s)", resp.status_code)
|
||||
return False
|
||||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||||
logger.warning("Failed to post daily summary: %s", exc)
|
||||
return False
|
||||
|
||||
|
||||
# ── Main loop class ───────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class BacklogTriageLoop:
|
||||
"""Autonomous backlog triage loop.
|
||||
|
||||
Fetches, scores, and assigns Gitea issues on a configurable interval.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
interval:
|
||||
Seconds between triage cycles. Default: settings.backlog_triage_interval_seconds.
|
||||
dry_run:
|
||||
When True, score and log decisions but don't write to Gitea.
|
||||
daily_summary:
|
||||
When True, post a daily triage summary issue after each cycle.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
*,
|
||||
interval: float | None = None,
|
||||
dry_run: bool | None = None,
|
||||
daily_summary: bool | None = None,
|
||||
) -> None:
|
||||
self._interval = float(interval or settings.backlog_triage_interval_seconds)
|
||||
self._dry_run = dry_run if dry_run is not None else settings.backlog_triage_dry_run
|
||||
self._daily_summary = (
|
||||
daily_summary if daily_summary is not None else settings.backlog_triage_daily_summary
|
||||
)
|
||||
self._running = False
|
||||
self._task: asyncio.Task | None = None
|
||||
self._cycle_count = 0
|
||||
self._last_summary_date: str = ""
|
||||
self.history: list[TriageCycleResult] = []
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
@property
|
||||
def cycle_count(self) -> int:
|
||||
return self._cycle_count
|
||||
|
||||
async def run_once(self) -> TriageCycleResult:
|
||||
"""Execute one full triage cycle.
|
||||
|
||||
1. Fetch all open Gitea issues
|
||||
2. Score and prioritize
|
||||
3. Decide on each unassigned ready issue
|
||||
4. Execute decisions
|
||||
5. Optionally post daily summary
|
||||
"""
|
||||
import time
|
||||
|
||||
self._cycle_count += 1
|
||||
start = time.monotonic()
|
||||
ts = datetime.now(UTC).isoformat()
|
||||
result = TriageCycleResult(timestamp=ts, total_open=0, scored=0, ready=0)
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
logger.warning("Backlog triage: Gitea not configured — skipping cycle")
|
||||
return result
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
# 1. Fetch
|
||||
raw_issues = await fetch_open_issues(client)
|
||||
result.total_open = len(raw_issues)
|
||||
logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues))
|
||||
|
||||
# 2. Score
|
||||
scored = [score_issue(i) for i in raw_issues]
|
||||
result.scored = len(scored)
|
||||
result.ready = sum(1 for s in scored if s.ready)
|
||||
|
||||
# 3 & 4. Decide and execute for each issue
|
||||
for issue in scored:
|
||||
decision = decide(issue)
|
||||
if decision.action == "skip":
|
||||
result.decisions.append(decision)
|
||||
continue
|
||||
decision = await execute_decision(client, decision, dry_run=self._dry_run)
|
||||
result.decisions.append(decision)
|
||||
|
||||
# Rate-limit: short pause between API writes to avoid hammering Gitea
|
||||
if not self._dry_run:
|
||||
await asyncio.sleep(0.5)
|
||||
|
||||
# 5. Daily summary (once per UTC day)
|
||||
today = datetime.now(UTC).strftime("%Y-%m-%d")
|
||||
if self._daily_summary and today != self._last_summary_date:
|
||||
await post_daily_summary(client, result, scored, dry_run=self._dry_run)
|
||||
self._last_summary_date = today
|
||||
|
||||
result.duration_ms = int((time.monotonic() - start) * 1000)
|
||||
self.history.append(result)
|
||||
|
||||
assigned_count = sum(1 for d in result.decisions if d.executed and d.action != "skip")
|
||||
logger.info(
|
||||
"Triage cycle #%d complete (%d ms): %d open, %d ready, %d assigned",
|
||||
self._cycle_count,
|
||||
result.duration_ms,
|
||||
result.total_open,
|
||||
result.ready,
|
||||
assigned_count,
|
||||
)
|
||||
return result
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the triage loop as a background task."""
|
||||
if self._running:
|
||||
logger.warning("BacklogTriageLoop already running")
|
||||
return
|
||||
self._running = True
|
||||
await self._loop()
|
||||
|
||||
async def _loop(self) -> None:
|
||||
logger.info(
|
||||
"BacklogTriageLoop started (interval=%.0fs, dry_run=%s)",
|
||||
self._interval,
|
||||
self._dry_run,
|
||||
)
|
||||
while self._running:
|
||||
try:
|
||||
await self.run_once()
|
||||
except Exception:
|
||||
logger.exception("Backlog triage cycle failed")
|
||||
await asyncio.sleep(self._interval)
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Signal the loop to stop after the current cycle."""
|
||||
self._running = False
|
||||
logger.info("BacklogTriageLoop stop requested")
|
||||
801
src/timmy/dispatcher.py
Normal file
801
src/timmy/dispatcher.py
Normal file
@@ -0,0 +1,801 @@
|
||||
"""Agent dispatcher — route tasks to Claude Code, Kimi, APIs, or Timmy itself.
|
||||
|
||||
Timmy's dispatch system: knows what agents are available, what they're good
|
||||
at, and how to send them work. Uses Gitea labels and issue comments to assign
|
||||
tasks and track completion.
|
||||
|
||||
Dispatch flow:
|
||||
1. Match task type to agent strengths
|
||||
2. Check agent availability (idle or working?)
|
||||
3. Dispatch task with full context (issue link, requirements, criteria)
|
||||
4. Log assignment as a Gitea comment
|
||||
5. Monitor for completion or timeout
|
||||
6. Review output quality
|
||||
7. If output fails QA → reassign or escalate
|
||||
|
||||
Agent interfaces:
|
||||
- Claude Code → ``claude-ready`` Gitea label + issue comment
|
||||
- Kimi Code → ``kimi-ready`` Gitea label + issue comment
|
||||
- Agent APIs → HTTP POST to external endpoint
|
||||
- Timmy (self) → direct local invocation
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.dispatcher import dispatch_task, TaskType, AgentType
|
||||
|
||||
result = await dispatch_task(
|
||||
issue_number=1072,
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
title="Design the LLM router",
|
||||
description="We need a cascade router...",
|
||||
acceptance_criteria=["Failover works", "Metrics exposed"],
|
||||
)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import Enum
|
||||
from typing import Any
|
||||
|
||||
from config import settings
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Enumerations
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class AgentType(str, Enum):
|
||||
"""Known agents in the swarm."""
|
||||
|
||||
CLAUDE_CODE = "claude_code"
|
||||
KIMI_CODE = "kimi_code"
|
||||
AGENT_API = "agent_api"
|
||||
TIMMY = "timmy"
|
||||
|
||||
|
||||
class TaskType(str, Enum):
|
||||
"""Categories of engineering work."""
|
||||
|
||||
# Claude Code strengths
|
||||
ARCHITECTURE = "architecture"
|
||||
REFACTORING = "refactoring"
|
||||
COMPLEX_REASONING = "complex_reasoning"
|
||||
CODE_REVIEW = "code_review"
|
||||
|
||||
# Kimi Code strengths
|
||||
PARALLEL_IMPLEMENTATION = "parallel_implementation"
|
||||
ROUTINE_CODING = "routine_coding"
|
||||
FAST_ITERATION = "fast_iteration"
|
||||
|
||||
# Agent API strengths
|
||||
RESEARCH = "research"
|
||||
ANALYSIS = "analysis"
|
||||
SPECIALIZED = "specialized"
|
||||
|
||||
# Timmy strengths
|
||||
TRIAGE = "triage"
|
||||
PLANNING = "planning"
|
||||
CREATIVE = "creative"
|
||||
ORCHESTRATION = "orchestration"
|
||||
|
||||
|
||||
class DispatchStatus(str, Enum):
|
||||
"""Lifecycle state of a dispatched task."""
|
||||
|
||||
PENDING = "pending"
|
||||
ASSIGNED = "assigned"
|
||||
IN_PROGRESS = "in_progress"
|
||||
COMPLETED = "completed"
|
||||
FAILED = "failed"
|
||||
ESCALATED = "escalated"
|
||||
TIMED_OUT = "timed_out"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class AgentSpec:
|
||||
"""Capabilities and limits for a single agent."""
|
||||
|
||||
name: AgentType
|
||||
display_name: str
|
||||
strengths: frozenset[TaskType]
|
||||
gitea_label: str | None # label to apply when dispatching
|
||||
max_concurrent: int = 1
|
||||
interface: str = "gitea" # "gitea" | "api" | "local"
|
||||
api_endpoint: str | None = None # for interface="api"
|
||||
|
||||
|
||||
#: Authoritative agent registry — all known agents and their capabilities.
|
||||
AGENT_REGISTRY: dict[AgentType, AgentSpec] = {
|
||||
AgentType.CLAUDE_CODE: AgentSpec(
|
||||
name=AgentType.CLAUDE_CODE,
|
||||
display_name="Claude Code",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.ARCHITECTURE,
|
||||
TaskType.REFACTORING,
|
||||
TaskType.COMPLEX_REASONING,
|
||||
TaskType.CODE_REVIEW,
|
||||
}
|
||||
),
|
||||
gitea_label="claude-ready",
|
||||
max_concurrent=1,
|
||||
interface="gitea",
|
||||
),
|
||||
AgentType.KIMI_CODE: AgentSpec(
|
||||
name=AgentType.KIMI_CODE,
|
||||
display_name="Kimi Code",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.PARALLEL_IMPLEMENTATION,
|
||||
TaskType.ROUTINE_CODING,
|
||||
TaskType.FAST_ITERATION,
|
||||
}
|
||||
),
|
||||
gitea_label="kimi-ready",
|
||||
max_concurrent=1,
|
||||
interface="gitea",
|
||||
),
|
||||
AgentType.AGENT_API: AgentSpec(
|
||||
name=AgentType.AGENT_API,
|
||||
display_name="Agent API",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.RESEARCH,
|
||||
TaskType.ANALYSIS,
|
||||
TaskType.SPECIALIZED,
|
||||
}
|
||||
),
|
||||
gitea_label=None,
|
||||
max_concurrent=5,
|
||||
interface="api",
|
||||
),
|
||||
AgentType.TIMMY: AgentSpec(
|
||||
name=AgentType.TIMMY,
|
||||
display_name="Timmy",
|
||||
strengths=frozenset(
|
||||
{
|
||||
TaskType.TRIAGE,
|
||||
TaskType.PLANNING,
|
||||
TaskType.CREATIVE,
|
||||
TaskType.ORCHESTRATION,
|
||||
}
|
||||
),
|
||||
gitea_label=None,
|
||||
max_concurrent=1,
|
||||
interface="local",
|
||||
),
|
||||
}
|
||||
|
||||
#: Map from task type to preferred agent (primary routing table).
|
||||
_TASK_ROUTING: dict[TaskType, AgentType] = {
|
||||
TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE,
|
||||
TaskType.REFACTORING: AgentType.CLAUDE_CODE,
|
||||
TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE,
|
||||
TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE,
|
||||
TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE,
|
||||
TaskType.ROUTINE_CODING: AgentType.KIMI_CODE,
|
||||
TaskType.FAST_ITERATION: AgentType.KIMI_CODE,
|
||||
TaskType.RESEARCH: AgentType.AGENT_API,
|
||||
TaskType.ANALYSIS: AgentType.AGENT_API,
|
||||
TaskType.SPECIALIZED: AgentType.AGENT_API,
|
||||
TaskType.TRIAGE: AgentType.TIMMY,
|
||||
TaskType.PLANNING: AgentType.TIMMY,
|
||||
TaskType.CREATIVE: AgentType.TIMMY,
|
||||
TaskType.ORCHESTRATION: AgentType.TIMMY,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch result
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
@dataclass
|
||||
class DispatchResult:
|
||||
"""Outcome of a dispatch call."""
|
||||
|
||||
task_type: TaskType
|
||||
agent: AgentType
|
||||
issue_number: int | None
|
||||
status: DispatchStatus
|
||||
comment_id: int | None = None
|
||||
label_applied: str | None = None
|
||||
error: str | None = None
|
||||
retry_count: int = 0
|
||||
metadata: dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
@property
|
||||
def success(self) -> bool: # noqa: D401
|
||||
return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Routing logic
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
def select_agent(task_type: TaskType) -> AgentType:
|
||||
"""Return the best agent for *task_type* based on the routing table.
|
||||
|
||||
Args:
|
||||
task_type: The category of engineering work to be done.
|
||||
|
||||
Returns:
|
||||
The :class:`AgentType` best suited to handle this task.
|
||||
"""
|
||||
return _TASK_ROUTING.get(task_type, AgentType.TIMMY)
|
||||
|
||||
|
||||
def infer_task_type(title: str, description: str = "") -> TaskType:
|
||||
"""Heuristic: guess the most appropriate :class:`TaskType` from text.
|
||||
|
||||
Scans *title* and *description* for keyword signals and returns the
|
||||
strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`.
|
||||
|
||||
Args:
|
||||
title: Short task title.
|
||||
description: Longer task description (optional).
|
||||
|
||||
Returns:
|
||||
The inferred :class:`TaskType`.
|
||||
"""
|
||||
text = (title + " " + description).lower()
|
||||
|
||||
_SIGNALS: list[tuple[TaskType, frozenset[str]]] = [
|
||||
(TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})),
|
||||
(TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})),
|
||||
(TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})),
|
||||
(TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})),
|
||||
(TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})),
|
||||
(TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})),
|
||||
(TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})),
|
||||
(TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})),
|
||||
(TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})),
|
||||
(TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})),
|
||||
(TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})),
|
||||
(TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})),
|
||||
]
|
||||
|
||||
for task_type, keywords in _SIGNALS:
|
||||
if any(kw in text for kw in keywords):
|
||||
return task_type
|
||||
|
||||
return TaskType.ROUTINE_CODING
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _post_gitea_comment(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
repo: str,
|
||||
headers: dict[str, str],
|
||||
issue_number: int,
|
||||
body: str,
|
||||
) -> int | None:
|
||||
"""Post a comment on a Gitea issue and return the comment ID."""
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
|
||||
headers=headers,
|
||||
json={"body": body},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
return resp.json().get("id")
|
||||
logger.warning(
|
||||
"Comment on #%s returned %s: %s",
|
||||
issue_number,
|
||||
resp.status_code,
|
||||
resp.text[:200],
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post comment on #%s: %s", issue_number, exc)
|
||||
return None
|
||||
|
||||
|
||||
async def _apply_gitea_label(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
repo: str,
|
||||
headers: dict[str, str],
|
||||
issue_number: int,
|
||||
label_name: str,
|
||||
label_color: str = "#0075ca",
|
||||
) -> bool:
|
||||
"""Ensure *label_name* exists and apply it to an issue.
|
||||
|
||||
Returns True if the label was successfully applied.
|
||||
"""
|
||||
# Resolve or create the label
|
||||
label_id: int | None = None
|
||||
try:
|
||||
resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers)
|
||||
if resp.status_code == 200:
|
||||
for lbl in resp.json():
|
||||
if lbl.get("name") == label_name:
|
||||
label_id = lbl["id"]
|
||||
break
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to list labels: %s", exc)
|
||||
return False
|
||||
|
||||
if label_id is None:
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/labels",
|
||||
headers=headers,
|
||||
json={"name": label_name, "color": label_color},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
label_id = resp.json().get("id")
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to create label %r: %s", label_name, exc)
|
||||
return False
|
||||
|
||||
if label_id is None:
|
||||
return False
|
||||
|
||||
# Apply label to the issue
|
||||
try:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
|
||||
headers=headers,
|
||||
json={"labels": [label_id]},
|
||||
)
|
||||
return resp.status_code in (200, 201)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc)
|
||||
return False
|
||||
|
||||
|
||||
async def _poll_issue_completion(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 7200,
|
||||
) -> DispatchStatus:
|
||||
"""Poll a Gitea issue until closed (completed) or timeout.
|
||||
|
||||
Args:
|
||||
issue_number: Gitea issue to watch.
|
||||
poll_interval: Seconds between polls.
|
||||
max_wait: Maximum total seconds to wait.
|
||||
|
||||
Returns:
|
||||
:attr:`DispatchStatus.COMPLETED` if the issue was closed,
|
||||
:attr:`DispatchStatus.TIMED_OUT` otherwise.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc:
|
||||
logger.warning("poll_issue_completion: missing dependency: %s", exc)
|
||||
return DispatchStatus.FAILED
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
|
||||
|
||||
elapsed = 0
|
||||
while elapsed < max_wait:
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.get(issue_url, headers=headers)
|
||||
if resp.status_code == 200 and resp.json().get("state") == "closed":
|
||||
logger.info("Issue #%s closed — task completed", issue_number)
|
||||
return DispatchStatus.COMPLETED
|
||||
except Exception as exc:
|
||||
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
|
||||
|
||||
await asyncio.sleep(poll_interval)
|
||||
elapsed += poll_interval
|
||||
|
||||
logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait)
|
||||
return DispatchStatus.TIMED_OUT
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Core dispatch functions
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def _dispatch_via_gitea(
|
||||
agent: AgentType,
|
||||
issue_number: int,
|
||||
title: str,
|
||||
description: str,
|
||||
acceptance_criteria: list[str],
|
||||
) -> DispatchResult:
|
||||
"""Assign a task by applying a Gitea label and posting an assignment comment.
|
||||
|
||||
Args:
|
||||
agent: Target agent.
|
||||
issue_number: Gitea issue to assign.
|
||||
title: Short task title.
|
||||
description: Full task description.
|
||||
acceptance_criteria: List of acceptance criteria strings.
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the outcome.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
except ImportError as exc:
|
||||
return DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"Missing dependency: {exc}",
|
||||
)
|
||||
|
||||
spec = AGENT_REGISTRY[agent]
|
||||
task_type = infer_task_type(title, description)
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Gitea integration not configured (no token or disabled).",
|
||||
)
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
|
||||
comment_id: int | None = None
|
||||
label_applied: str | None = None
|
||||
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
# 1. Apply agent label (if applicable)
|
||||
if spec.gitea_label:
|
||||
ok = await _apply_gitea_label(
|
||||
client, base_url, repo, headers, issue_number, spec.gitea_label
|
||||
)
|
||||
if ok:
|
||||
label_applied = spec.gitea_label
|
||||
logger.info(
|
||||
"Applied label %r to issue #%s for %s",
|
||||
spec.gitea_label,
|
||||
issue_number,
|
||||
spec.display_name,
|
||||
)
|
||||
else:
|
||||
logger.warning(
|
||||
"Could not apply label %r to issue #%s",
|
||||
spec.gitea_label,
|
||||
issue_number,
|
||||
)
|
||||
|
||||
# 2. Post assignment comment
|
||||
criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_"
|
||||
comment_body = (
|
||||
f"## Assigned to {spec.display_name}\n\n"
|
||||
f"**Task type:** `{task_type.value}`\n\n"
|
||||
f"**Description:**\n{description}\n\n"
|
||||
f"**Acceptance criteria:**\n{criteria_md}\n\n"
|
||||
f"---\n*Dispatched by Timmy agent dispatcher.*"
|
||||
)
|
||||
comment_id = await _post_gitea_comment(
|
||||
client, base_url, repo, headers, issue_number, comment_body
|
||||
)
|
||||
|
||||
if comment_id is not None or label_applied is not None:
|
||||
logger.info(
|
||||
"Dispatched issue #%s to %s (label=%r, comment=%s)",
|
||||
issue_number,
|
||||
spec.display_name,
|
||||
label_applied,
|
||||
comment_id,
|
||||
)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
comment_id=comment_id,
|
||||
label_applied=label_applied,
|
||||
)
|
||||
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Failed to apply label and post comment — check Gitea connectivity.",
|
||||
)
|
||||
|
||||
|
||||
async def _dispatch_via_api(
|
||||
agent: AgentType,
|
||||
title: str,
|
||||
description: str,
|
||||
acceptance_criteria: list[str],
|
||||
issue_number: int | None = None,
|
||||
endpoint: str | None = None,
|
||||
) -> DispatchResult:
|
||||
"""Dispatch a task to an external HTTP API agent.
|
||||
|
||||
Args:
|
||||
agent: Target agent.
|
||||
title: Short task title.
|
||||
description: Task description.
|
||||
acceptance_criteria: List of acceptance criteria.
|
||||
issue_number: Optional Gitea issue for cross-referencing.
|
||||
endpoint: Override API endpoint URL (uses spec default if omitted).
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the outcome.
|
||||
"""
|
||||
spec = AGENT_REGISTRY[agent]
|
||||
task_type = infer_task_type(title, description)
|
||||
url = endpoint or spec.api_endpoint
|
||||
|
||||
if not url:
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"No API endpoint configured for agent {agent.value}.",
|
||||
)
|
||||
|
||||
payload = {
|
||||
"title": title,
|
||||
"description": description,
|
||||
"acceptance_criteria": acceptance_criteria,
|
||||
"issue_number": issue_number,
|
||||
"agent": agent.value,
|
||||
"task_type": task_type.value,
|
||||
}
|
||||
|
||||
try:
|
||||
import httpx
|
||||
|
||||
async with httpx.AsyncClient(timeout=30) as client:
|
||||
resp = await client.post(url, json=payload)
|
||||
|
||||
if resp.status_code in (200, 201, 202):
|
||||
logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
metadata={"response": resp.json() if resp.content else {}},
|
||||
)
|
||||
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=f"API agent returned {resp.status_code}: {resp.text[:200]}",
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("API dispatch to %s failed: %s", url, exc)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=agent,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error=str(exc),
|
||||
)
|
||||
|
||||
|
||||
async def _dispatch_local(
|
||||
title: str,
|
||||
description: str = "",
|
||||
acceptance_criteria: list[str] | None = None,
|
||||
issue_number: int | None = None,
|
||||
) -> DispatchResult:
|
||||
"""Handle a task locally — Timmy processes it directly.
|
||||
|
||||
This is a lightweight stub. Real local execution should be wired
|
||||
into the agentic loop or a dedicated Timmy tool.
|
||||
|
||||
Args:
|
||||
title: Short task title.
|
||||
description: Task description.
|
||||
acceptance_criteria: Acceptance criteria list.
|
||||
issue_number: Optional Gitea issue number for logging.
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` with ASSIGNED status (local execution is
|
||||
assumed to succeed at dispatch time).
|
||||
"""
|
||||
task_type = infer_task_type(title, description)
|
||||
logger.info(
|
||||
"Timmy handling task locally: %r (issue #%s)", title[:60], issue_number
|
||||
)
|
||||
return DispatchResult(
|
||||
task_type=task_type,
|
||||
agent=AgentType.TIMMY,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
metadata={"local": True, "description": description},
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public entry point
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def dispatch_task(
|
||||
title: str,
|
||||
description: str = "",
|
||||
acceptance_criteria: list[str] | None = None,
|
||||
task_type: TaskType | None = None,
|
||||
agent: AgentType | None = None,
|
||||
issue_number: int | None = None,
|
||||
api_endpoint: str | None = None,
|
||||
max_retries: int = 1,
|
||||
) -> DispatchResult:
|
||||
"""Route a task to the best available agent.
|
||||
|
||||
This is the primary entry point. Callers can either specify the
|
||||
*agent* and *task_type* explicitly or let the dispatcher infer them
|
||||
from the *title* and *description*.
|
||||
|
||||
Args:
|
||||
title: Short human-readable task title.
|
||||
description: Full task description with context.
|
||||
acceptance_criteria: List of acceptance criteria strings.
|
||||
task_type: Override automatic task type inference.
|
||||
agent: Override automatic agent selection.
|
||||
issue_number: Gitea issue number to log the assignment on.
|
||||
api_endpoint: Override API endpoint for AGENT_API dispatches.
|
||||
max_retries: Number of retry attempts on failure (default 1).
|
||||
|
||||
Returns:
|
||||
:class:`DispatchResult` describing the final dispatch outcome.
|
||||
|
||||
Example::
|
||||
|
||||
result = await dispatch_task(
|
||||
issue_number=1072,
|
||||
title="Build the cascade LLM router",
|
||||
description="We need automatic failover...",
|
||||
acceptance_criteria=["Circuit breaker works", "Metrics exposed"],
|
||||
)
|
||||
if result.success:
|
||||
print(f"Assigned to {result.agent.value}")
|
||||
"""
|
||||
criteria = acceptance_criteria or []
|
||||
|
||||
if not title.strip():
|
||||
return DispatchResult(
|
||||
task_type=task_type or TaskType.ROUTINE_CODING,
|
||||
agent=agent or AgentType.TIMMY,
|
||||
issue_number=issue_number,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="`title` is required.",
|
||||
)
|
||||
|
||||
resolved_type = task_type or infer_task_type(title, description)
|
||||
resolved_agent = agent or select_agent(resolved_type)
|
||||
|
||||
logger.info(
|
||||
"Dispatching task %r → %s (type=%s, issue=#%s)",
|
||||
title[:60],
|
||||
resolved_agent.value,
|
||||
resolved_type.value,
|
||||
issue_number,
|
||||
)
|
||||
|
||||
spec = AGENT_REGISTRY[resolved_agent]
|
||||
|
||||
last_result: DispatchResult | None = None
|
||||
for attempt in range(max_retries + 1):
|
||||
if attempt > 0:
|
||||
logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60])
|
||||
|
||||
if spec.interface == "gitea" and issue_number is not None:
|
||||
result = await _dispatch_via_gitea(
|
||||
resolved_agent, issue_number, title, description, criteria
|
||||
)
|
||||
elif spec.interface == "api":
|
||||
result = await _dispatch_via_api(
|
||||
resolved_agent, title, description, criteria, issue_number, api_endpoint
|
||||
)
|
||||
else:
|
||||
result = await _dispatch_local(title, description, criteria, issue_number)
|
||||
|
||||
result.retry_count = attempt
|
||||
last_result = result
|
||||
|
||||
if result.success:
|
||||
return result
|
||||
|
||||
logger.warning(
|
||||
"Dispatch attempt %d failed for task %r: %s",
|
||||
attempt + 1,
|
||||
title[:60],
|
||||
result.error,
|
||||
)
|
||||
|
||||
# All attempts exhausted — escalate
|
||||
assert last_result is not None
|
||||
last_result.status = DispatchStatus.ESCALATED
|
||||
logger.error(
|
||||
"Task %r escalated after %d failed attempt(s): %s",
|
||||
title[:60],
|
||||
max_retries + 1,
|
||||
last_result.error,
|
||||
)
|
||||
|
||||
# Try to log the escalation on the issue
|
||||
if issue_number is not None:
|
||||
await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error")
|
||||
|
||||
return last_result
|
||||
|
||||
|
||||
async def _log_escalation(
|
||||
issue_number: int,
|
||||
agent: AgentType,
|
||||
error: str,
|
||||
) -> None:
|
||||
"""Post an escalation notice on the Gitea issue."""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
body = (
|
||||
f"## Dispatch Escalated\n\n"
|
||||
f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** "
|
||||
f"after {1} attempt(s).\n\n"
|
||||
f"**Error:** {error}\n\n"
|
||||
f"Manual intervention required.\n\n"
|
||||
f"---\n*Timmy agent dispatcher.*"
|
||||
)
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
await _post_gitea_comment(
|
||||
client, base_url, repo, headers, issue_number, body
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("Failed to post escalation comment: %s", exc)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Monitoring helper
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
async def wait_for_completion(
|
||||
issue_number: int,
|
||||
poll_interval: int = 60,
|
||||
max_wait: int = 7200,
|
||||
) -> DispatchStatus:
|
||||
"""Block until the assigned Gitea issue is closed or the timeout fires.
|
||||
|
||||
Useful for synchronous orchestration where the caller wants to wait for
|
||||
the assigned agent to finish before proceeding.
|
||||
|
||||
Args:
|
||||
issue_number: Gitea issue to monitor.
|
||||
poll_interval: Seconds between status polls.
|
||||
max_wait: Maximum wait in seconds (default 2 hours).
|
||||
|
||||
Returns:
|
||||
:attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`.
|
||||
"""
|
||||
return await _poll_issue_completion(issue_number, poll_interval, max_wait)
|
||||
@@ -462,7 +462,8 @@ def consult_grok(query: str) -> str:
|
||||
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
|
||||
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
|
||||
except (ImportError, OSError, ValueError) as exc:
|
||||
logger.warning("Tool execution failed (Lightning invoice): %s", exc)
|
||||
logger.error("Lightning invoice creation failed: %s", exc)
|
||||
return "Error: Failed to create Lightning invoice. Please check logs."
|
||||
|
||||
result = backend.run(query)
|
||||
|
||||
@@ -533,7 +534,8 @@ def _register_web_fetch_tool(toolkit: Toolkit) -> None:
|
||||
try:
|
||||
toolkit.register(web_fetch, name="web_fetch")
|
||||
except Exception as exc:
|
||||
logger.warning("Tool execution failed (web_fetch registration): %s", exc)
|
||||
logger.error("Failed to register web_fetch tool: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
|
||||
@@ -565,8 +567,8 @@ def _register_grok_tool(toolkit: Toolkit) -> None:
|
||||
toolkit.register(consult_grok, name="consult_grok")
|
||||
logger.info("Grok consultation tool registered")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (Grok registration): %s", exc)
|
||||
logger.debug("Grok tool not available")
|
||||
logger.error("Failed to register Grok tool: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_memory_tools(toolkit: Toolkit) -> None:
|
||||
@@ -579,8 +581,8 @@ def _register_memory_tools(toolkit: Toolkit) -> None:
|
||||
toolkit.register(memory_read, name="memory_read")
|
||||
toolkit.register(memory_forget, name="memory_forget")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (Memory tools registration): %s", exc)
|
||||
logger.debug("Memory tools not available")
|
||||
logger.error("Failed to register Memory tools: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
|
||||
@@ -628,8 +630,8 @@ def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
|
||||
|
||||
toolkit.register(plan_and_execute, name="plan_and_execute")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (plan_and_execute registration): %s", exc)
|
||||
logger.debug("plan_and_execute tool not available")
|
||||
logger.error("Failed to register plan_and_execute tool: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_introspection_tools(toolkit: Toolkit) -> None:
|
||||
@@ -647,15 +649,16 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
|
||||
toolkit.register(get_memory_status, name="get_memory_status")
|
||||
toolkit.register(run_self_tests, name="run_self_tests")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
|
||||
logger.debug("Introspection tools not available")
|
||||
logger.error("Failed to register Introspection tools: %s", exc)
|
||||
raise
|
||||
|
||||
try:
|
||||
from timmy.mcp_tools import update_gitea_avatar
|
||||
|
||||
toolkit.register(update_gitea_avatar, name="update_gitea_avatar")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.debug("update_gitea_avatar tool not available: %s", exc)
|
||||
logger.error("Failed to register update_gitea_avatar tool: %s", exc)
|
||||
raise
|
||||
|
||||
try:
|
||||
from timmy.session_logger import self_reflect, session_history
|
||||
@@ -663,8 +666,8 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
|
||||
toolkit.register(session_history, name="session_history")
|
||||
toolkit.register(self_reflect, name="self_reflect")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (session_history registration): %s", exc)
|
||||
logger.debug("session_history tool not available")
|
||||
logger.error("Failed to register session_history tool: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_delegation_tools(toolkit: Toolkit) -> None:
|
||||
@@ -676,8 +679,8 @@ def _register_delegation_tools(toolkit: Toolkit) -> None:
|
||||
toolkit.register(delegate_to_kimi, name="delegate_to_kimi")
|
||||
toolkit.register(list_swarm_agents, name="list_swarm_agents")
|
||||
except Exception as exc:
|
||||
logger.warning("Tool execution failed (Delegation tools registration): %s", exc)
|
||||
logger.debug("Delegation tools not available")
|
||||
logger.error("Failed to register Delegation tools: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_gematria_tool(toolkit: Toolkit) -> None:
|
||||
@@ -687,8 +690,8 @@ def _register_gematria_tool(toolkit: Toolkit) -> None:
|
||||
|
||||
toolkit.register(gematria, name="gematria")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (Gematria registration): %s", exc)
|
||||
logger.debug("Gematria tool not available")
|
||||
logger.error("Failed to register Gematria tool: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_artifact_tools(toolkit: Toolkit) -> None:
|
||||
@@ -699,8 +702,8 @@ def _register_artifact_tools(toolkit: Toolkit) -> None:
|
||||
toolkit.register(jot_note, name="jot_note")
|
||||
toolkit.register(log_decision, name="log_decision")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (Artifact tools registration): %s", exc)
|
||||
logger.debug("Artifact tools not available")
|
||||
logger.error("Failed to register Artifact tools: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def _register_thinking_tools(toolkit: Toolkit) -> None:
|
||||
@@ -710,8 +713,8 @@ def _register_thinking_tools(toolkit: Toolkit) -> None:
|
||||
|
||||
toolkit.register(search_thoughts, name="thought_search")
|
||||
except (ImportError, AttributeError) as exc:
|
||||
logger.warning("Tool execution failed (Thinking tools registration): %s", exc)
|
||||
logger.debug("Thinking tools not available")
|
||||
logger.error("Failed to register Thinking tools: %s", exc)
|
||||
raise
|
||||
|
||||
|
||||
def create_full_toolkit(base_dir: str | Path | None = None):
|
||||
|
||||
21
src/timmy/vassal/__init__.py
Normal file
21
src/timmy/vassal/__init__.py
Normal file
@@ -0,0 +1,21 @@
|
||||
"""Vassal Protocol — Timmy as autonomous orchestrator.
|
||||
|
||||
Timmy is Alex's vassal: the lead decision-maker for development direction,
|
||||
agent management, and house health. He observes the Gitea backlog, decides
|
||||
priorities, dispatches work to agents (Claude, Kimi, self), monitors output,
|
||||
and keeps Hermes (M3 Max) running well.
|
||||
|
||||
Public API
|
||||
----------
|
||||
from timmy.vassal import vassal_orchestrator
|
||||
|
||||
await vassal_orchestrator.run_cycle()
|
||||
snapshot = vassal_orchestrator.get_status()
|
||||
"""
|
||||
|
||||
from timmy.vassal.orchestration_loop import VassalOrchestrator
|
||||
|
||||
# Module-level singleton — import and use directly.
|
||||
vassal_orchestrator = VassalOrchestrator()
|
||||
|
||||
__all__ = ["VassalOrchestrator", "vassal_orchestrator"]
|
||||
296
src/timmy/vassal/agent_health.py
Normal file
296
src/timmy/vassal/agent_health.py
Normal file
@@ -0,0 +1,296 @@
|
||||
"""Vassal Protocol — agent health monitoring.
|
||||
|
||||
Monitors whether downstream agents (Claude, Kimi) are making progress on
|
||||
their assigned issues. Detects idle and stuck agents by querying Gitea
|
||||
for issues with dispatch labels and checking last-comment timestamps.
|
||||
|
||||
Stuck agent heuristic
|
||||
---------------------
|
||||
An agent is considered "stuck" on an issue if:
|
||||
- The issue has been labeled ``claude-ready`` or ``kimi-ready``
|
||||
- No new comment has appeared in the last ``stuck_threshold_minutes``
|
||||
- The issue has not been closed
|
||||
|
||||
Idle agent heuristic
|
||||
--------------------
|
||||
An agent is "idle" if it has no currently assigned (labeled) open issues.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime, timedelta
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_AGENT_LABELS = {
|
||||
"claude": "claude-ready",
|
||||
"kimi": "kimi-ready",
|
||||
}
|
||||
|
||||
_DEFAULT_STUCK_MINUTES = 120
|
||||
_DEFAULT_IDLE_THRESHOLD = 30
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data models
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentStatus:
|
||||
"""Health snapshot for one agent at a point in time."""
|
||||
|
||||
agent: str # "claude" | "kimi" | "timmy"
|
||||
is_idle: bool = True
|
||||
active_issue_numbers: list[int] = field(default_factory=list)
|
||||
stuck_issue_numbers: list[int] = field(default_factory=list)
|
||||
checked_at: str = field(
|
||||
default_factory=lambda: datetime.now(UTC).isoformat()
|
||||
)
|
||||
|
||||
@property
|
||||
def is_stuck(self) -> bool:
|
||||
return bool(self.stuck_issue_numbers)
|
||||
|
||||
@property
|
||||
def needs_reassignment(self) -> bool:
|
||||
return self.is_stuck
|
||||
|
||||
|
||||
@dataclass
|
||||
class AgentHealthReport:
|
||||
"""Combined health report for all monitored agents."""
|
||||
|
||||
agents: list[AgentStatus] = field(default_factory=list)
|
||||
generated_at: str = field(
|
||||
default_factory=lambda: datetime.now(UTC).isoformat()
|
||||
)
|
||||
|
||||
@property
|
||||
def any_stuck(self) -> bool:
|
||||
return any(a.is_stuck for a in self.agents)
|
||||
|
||||
@property
|
||||
def all_idle(self) -> bool:
|
||||
return all(a.is_idle for a in self.agents)
|
||||
|
||||
def for_agent(self, name: str) -> AgentStatus | None:
|
||||
for a in self.agents:
|
||||
if a.agent == name:
|
||||
return a
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea queries
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _fetch_labeled_issues(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
headers: dict,
|
||||
repo: str,
|
||||
label: str,
|
||||
) -> list[dict]:
|
||||
"""Return open issues carrying a specific label."""
|
||||
try:
|
||||
resp = await client.get(
|
||||
f"{base_url}/repos/{repo}/issues",
|
||||
headers=headers,
|
||||
params={"state": "open", "labels": label, "limit": 50},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
return [i for i in resp.json() if not i.get("pull_request")]
|
||||
except Exception as exc:
|
||||
logger.warning("_fetch_labeled_issues: %s — %s", label, exc)
|
||||
return []
|
||||
|
||||
|
||||
async def _last_comment_time(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
headers: dict,
|
||||
repo: str,
|
||||
issue_number: int,
|
||||
) -> datetime | None:
|
||||
"""Return the timestamp of the most recent comment on an issue."""
|
||||
try:
|
||||
resp = await client.get(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
|
||||
headers=headers,
|
||||
params={"limit": 1},
|
||||
)
|
||||
if resp.status_code == 200:
|
||||
comments = resp.json()
|
||||
if comments:
|
||||
ts = comments[-1].get("updated_at") or comments[-1].get("created_at")
|
||||
if ts:
|
||||
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
except Exception as exc:
|
||||
logger.debug("_last_comment_time: issue #%d — %s", issue_number, exc)
|
||||
return None
|
||||
|
||||
|
||||
async def _issue_created_time(issue: dict) -> datetime | None:
|
||||
ts = issue.get("created_at")
|
||||
if ts:
|
||||
try:
|
||||
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
pass
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Health check
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def check_agent_health(
|
||||
agent_name: str,
|
||||
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
|
||||
) -> AgentStatus:
|
||||
"""Query Gitea for issues assigned to *agent_name* and assess health.
|
||||
|
||||
Args:
|
||||
agent_name: One of "claude", "kimi".
|
||||
stuck_threshold_minutes: Minutes of silence before an issue is
|
||||
considered stuck.
|
||||
|
||||
Returns:
|
||||
AgentStatus for this agent.
|
||||
"""
|
||||
status = AgentStatus(agent=agent_name)
|
||||
|
||||
label = _AGENT_LABELS.get(agent_name)
|
||||
if not label:
|
||||
logger.debug("check_agent_health: unknown agent %s", agent_name)
|
||||
return status
|
||||
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
logger.warning("check_agent_health: missing dependency — %s", exc)
|
||||
return status
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return status
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes)
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
issues = await _fetch_labeled_issues(
|
||||
client, base_url, headers, repo, label
|
||||
)
|
||||
|
||||
for issue in issues:
|
||||
num = issue.get("number", 0)
|
||||
status.active_issue_numbers.append(num)
|
||||
|
||||
# Check last activity
|
||||
last_activity = await _last_comment_time(
|
||||
client, base_url, headers, repo, num
|
||||
)
|
||||
if last_activity is None:
|
||||
last_activity = await _issue_created_time(issue)
|
||||
|
||||
if last_activity is not None and last_activity < cutoff:
|
||||
status.stuck_issue_numbers.append(num)
|
||||
logger.info(
|
||||
"check_agent_health: %s issue #%d stuck since %s",
|
||||
agent_name,
|
||||
num,
|
||||
last_activity.isoformat(),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.warning("check_agent_health: %s query failed — %s", agent_name, exc)
|
||||
|
||||
status.is_idle = len(status.active_issue_numbers) == 0
|
||||
return status
|
||||
|
||||
|
||||
async def get_full_health_report(
|
||||
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
|
||||
) -> AgentHealthReport:
|
||||
"""Run health checks for all monitored agents and return combined report.
|
||||
|
||||
Args:
|
||||
stuck_threshold_minutes: Passed through to each agent check.
|
||||
|
||||
Returns:
|
||||
AgentHealthReport with status for Claude and Kimi.
|
||||
"""
|
||||
import asyncio
|
||||
|
||||
claude_status, kimi_status = await asyncio.gather(
|
||||
check_agent_health("claude", stuck_threshold_minutes),
|
||||
check_agent_health("kimi", stuck_threshold_minutes),
|
||||
)
|
||||
return AgentHealthReport(agents=[claude_status, kimi_status])
|
||||
|
||||
|
||||
async def nudge_stuck_agent(
|
||||
agent_name: str,
|
||||
issue_number: int,
|
||||
) -> bool:
|
||||
"""Post a nudge comment on a stuck issue to prompt the agent.
|
||||
|
||||
Args:
|
||||
agent_name: The agent that appears stuck.
|
||||
issue_number: The Gitea issue number to nudge.
|
||||
|
||||
Returns:
|
||||
True if the comment was posted successfully.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
logger.warning("nudge_stuck_agent: missing dependency — %s", exc)
|
||||
return False
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
return False
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
body = (
|
||||
f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n"
|
||||
"Please post a status update or close if complete."
|
||||
)
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=10) as client:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
|
||||
headers=headers,
|
||||
json={"body": body},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
logger.info(
|
||||
"nudge_stuck_agent: nudged %s on issue #%d",
|
||||
agent_name,
|
||||
issue_number,
|
||||
)
|
||||
return True
|
||||
except Exception as exc:
|
||||
logger.warning("nudge_stuck_agent: failed — %s", exc)
|
||||
return False
|
||||
281
src/timmy/vassal/backlog.py
Normal file
281
src/timmy/vassal/backlog.py
Normal file
@@ -0,0 +1,281 @@
|
||||
"""Vassal Protocol — Gitea backlog triage.
|
||||
|
||||
Fetches open issues from Gitea, scores each one for priority and agent
|
||||
suitability, and returns a ranked list ready for dispatch.
|
||||
|
||||
Complexity scoring heuristics
|
||||
------------------------------
|
||||
high_complexity_keywords → route to Claude (architecture, refactor, review)
|
||||
research_keywords → route to Kimi (survey, analysis, benchmark)
|
||||
routine_keywords → route to Timmy/self (docs, chore, config)
|
||||
otherwise → Timmy self-handles
|
||||
|
||||
Priority scoring
|
||||
----------------
|
||||
URGENT label → 100
|
||||
HIGH / critical → 75
|
||||
NORMAL (default) → 50
|
||||
LOW / chore → 25
|
||||
Already assigned → deprioritized (subtract 20)
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from enum import StrEnum
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Constants
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
# Labels that hint at complexity level / agent suitability
|
||||
_HIGH_COMPLEXITY = frozenset(
|
||||
{
|
||||
"architecture",
|
||||
"refactor",
|
||||
"code review",
|
||||
"security",
|
||||
"performance",
|
||||
"breaking change",
|
||||
"design",
|
||||
"complex",
|
||||
}
|
||||
)
|
||||
|
||||
_RESEARCH_KEYWORDS = frozenset(
|
||||
{
|
||||
"research",
|
||||
"survey",
|
||||
"analysis",
|
||||
"benchmark",
|
||||
"comparative",
|
||||
"investigation",
|
||||
"deep dive",
|
||||
"review",
|
||||
}
|
||||
)
|
||||
|
||||
_ROUTINE_KEYWORDS = frozenset(
|
||||
{
|
||||
"docs",
|
||||
"documentation",
|
||||
"chore",
|
||||
"config",
|
||||
"typo",
|
||||
"rename",
|
||||
"cleanup",
|
||||
"trivial",
|
||||
"style",
|
||||
}
|
||||
)
|
||||
|
||||
_PRIORITY_LABEL_SCORES: dict[str, int] = {
|
||||
"urgent": 100,
|
||||
"critical": 90,
|
||||
"high": 75,
|
||||
"normal": 50,
|
||||
"low": 25,
|
||||
"chore": 20,
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data models
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class AgentTarget(StrEnum):
|
||||
"""Which agent should handle this issue."""
|
||||
|
||||
TIMMY = "timmy" # Timmy handles locally (self)
|
||||
CLAUDE = "claude" # Dispatch to Claude Code
|
||||
KIMI = "kimi" # Dispatch to Kimi Code
|
||||
|
||||
|
||||
@dataclass
|
||||
class TriagedIssue:
|
||||
"""A Gitea issue enriched with triage metadata."""
|
||||
|
||||
number: int
|
||||
title: str
|
||||
body: str
|
||||
labels: list[str] = field(default_factory=list)
|
||||
assignees: list[str] = field(default_factory=list)
|
||||
priority_score: int = 50
|
||||
agent_target: AgentTarget = AgentTarget.TIMMY
|
||||
rationale: str = ""
|
||||
url: str = ""
|
||||
raw: dict = field(default_factory=dict)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Scoring helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _extract_labels(issue: dict[str, Any]) -> list[str]:
|
||||
"""Return normalised label names from a raw Gitea issue dict."""
|
||||
return [lbl.get("name", "").lower() for lbl in issue.get("labels", [])]
|
||||
|
||||
|
||||
def _score_priority(labels: list[str], assignees: list[str]) -> int:
|
||||
score = _PRIORITY_LABEL_SCORES.get("normal", 50)
|
||||
for lbl in labels:
|
||||
for key, val in _PRIORITY_LABEL_SCORES.items():
|
||||
if key in lbl:
|
||||
score = max(score, val)
|
||||
if assignees:
|
||||
score -= 20 # already assigned — lower urgency for fresh dispatch
|
||||
return max(0, score)
|
||||
|
||||
|
||||
def _choose_agent(title: str, body: str, labels: list[str]) -> tuple[AgentTarget, str]:
|
||||
"""Heuristic: pick the best agent and return (target, rationale)."""
|
||||
combined = f"{title} {body} {' '.join(labels)}".lower()
|
||||
|
||||
if any(kw in combined for kw in _HIGH_COMPLEXITY):
|
||||
return AgentTarget.CLAUDE, "high-complexity keywords detected"
|
||||
|
||||
if any(kw in combined for kw in _RESEARCH_KEYWORDS):
|
||||
return AgentTarget.KIMI, "research keywords detected"
|
||||
|
||||
if any(kw in combined for kw in _ROUTINE_KEYWORDS):
|
||||
return AgentTarget.TIMMY, "routine task — Timmy self-handles"
|
||||
|
||||
return AgentTarget.TIMMY, "no specific routing signal — Timmy self-handles"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Triage
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]:
|
||||
"""Score and route a list of raw Gitea issue dicts.
|
||||
|
||||
Returns a list sorted by priority_score descending (highest first).
|
||||
|
||||
Args:
|
||||
raw_issues: List of issue objects from the Gitea API.
|
||||
|
||||
Returns:
|
||||
Sorted list of TriagedIssue with routing decisions.
|
||||
"""
|
||||
results: list[TriagedIssue] = []
|
||||
|
||||
for issue in raw_issues:
|
||||
number = issue.get("number", 0)
|
||||
title = issue.get("title", "")
|
||||
body = issue.get("body") or ""
|
||||
labels = _extract_labels(issue)
|
||||
assignees = [
|
||||
a.get("login", "") for a in issue.get("assignees") or []
|
||||
]
|
||||
url = issue.get("html_url", "")
|
||||
|
||||
priority = _score_priority(labels, assignees)
|
||||
agent, rationale = _choose_agent(title, body, labels)
|
||||
|
||||
results.append(
|
||||
TriagedIssue(
|
||||
number=number,
|
||||
title=title,
|
||||
body=body,
|
||||
labels=labels,
|
||||
assignees=assignees,
|
||||
priority_score=priority,
|
||||
agent_target=agent,
|
||||
rationale=rationale,
|
||||
url=url,
|
||||
raw=issue,
|
||||
)
|
||||
)
|
||||
|
||||
results.sort(key=lambda i: i.priority_score, reverse=True)
|
||||
logger.debug(
|
||||
"Triage complete: %d issues → %d Claude, %d Kimi, %d Timmy",
|
||||
len(results),
|
||||
sum(1 for i in results if i.agent_target == AgentTarget.CLAUDE),
|
||||
sum(1 for i in results if i.agent_target == AgentTarget.KIMI),
|
||||
sum(1 for i in results if i.agent_target == AgentTarget.TIMMY),
|
||||
)
|
||||
return results
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea fetch (async, gracefully degrading)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def fetch_open_issues(
|
||||
limit: int = 50,
|
||||
exclude_labels: list[str] | None = None,
|
||||
) -> list[dict[str, Any]]:
|
||||
"""Fetch open issues from the configured Gitea repo.
|
||||
|
||||
Args:
|
||||
limit: Maximum number of issues to return.
|
||||
exclude_labels: Labels whose issues should be skipped
|
||||
(e.g. ``["kimi-ready", "wip"]``).
|
||||
|
||||
Returns:
|
||||
List of raw issue dicts from the Gitea API,
|
||||
or empty list if Gitea is unavailable.
|
||||
"""
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
logger.warning("fetch_open_issues: missing dependency — %s", exc)
|
||||
return []
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
logger.info("fetch_open_issues: Gitea disabled or no token")
|
||||
return []
|
||||
|
||||
exclude = set(lbl.lower() for lbl in (exclude_labels or []))
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {"Authorization": f"token {settings.gitea_token}"}
|
||||
params = {"state": "open", "limit": min(limit, 50), "page": 1}
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
resp = await client.get(
|
||||
f"{base_url}/repos/{repo}/issues",
|
||||
headers=headers,
|
||||
params=params,
|
||||
)
|
||||
if resp.status_code != 200:
|
||||
logger.warning(
|
||||
"fetch_open_issues: Gitea returned %s", resp.status_code
|
||||
)
|
||||
return []
|
||||
|
||||
issues = resp.json()
|
||||
|
||||
# Filter out pull requests and excluded labels
|
||||
filtered = []
|
||||
for issue in issues:
|
||||
if issue.get("pull_request"):
|
||||
continue # skip PRs
|
||||
labels = _extract_labels(issue)
|
||||
if exclude and any(lbl in exclude for lbl in labels):
|
||||
continue
|
||||
filtered.append(issue)
|
||||
|
||||
logger.info(
|
||||
"fetch_open_issues: fetched %d/%d issues (after filtering)",
|
||||
len(filtered),
|
||||
len(issues),
|
||||
)
|
||||
return filtered
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("fetch_open_issues: Gitea request failed — %s", exc)
|
||||
return []
|
||||
213
src/timmy/vassal/dispatch.py
Normal file
213
src/timmy/vassal/dispatch.py
Normal file
@@ -0,0 +1,213 @@
|
||||
"""Vassal Protocol — agent dispatch.
|
||||
|
||||
Translates triage decisions into concrete Gitea actions:
|
||||
- Add ``claude-ready`` or ``kimi-ready`` label to an issue
|
||||
- Post a dispatch comment recording the routing rationale
|
||||
- Record the dispatch in the in-memory registry so the orchestration loop
|
||||
can track what was sent and when
|
||||
|
||||
The dispatch registry is intentionally in-memory (ephemeral). Durable
|
||||
tracking is out of scope for this module — that belongs in the task queue
|
||||
or a future orchestration DB.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
from timmy.vassal.backlog import AgentTarget, TriagedIssue
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Label names used by the dispatch system
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_LABEL_MAP: dict[AgentTarget, str] = {
|
||||
AgentTarget.CLAUDE: "claude-ready",
|
||||
AgentTarget.KIMI: "kimi-ready",
|
||||
AgentTarget.TIMMY: "timmy-ready",
|
||||
}
|
||||
|
||||
_LABEL_COLORS: dict[str, str] = {
|
||||
"claude-ready": "#8b6f47", # warm brown
|
||||
"kimi-ready": "#006b75", # dark teal
|
||||
"timmy-ready": "#0075ca", # blue
|
||||
}
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class DispatchRecord:
|
||||
"""A record of one issue being dispatched to an agent."""
|
||||
|
||||
issue_number: int
|
||||
issue_title: str
|
||||
agent: AgentTarget
|
||||
rationale: str
|
||||
dispatched_at: str = field(
|
||||
default_factory=lambda: datetime.now(UTC).isoformat()
|
||||
)
|
||||
label_applied: bool = False
|
||||
comment_posted: bool = False
|
||||
|
||||
|
||||
# Module-level registry: issue_number → DispatchRecord
|
||||
_registry: dict[int, DispatchRecord] = {}
|
||||
|
||||
|
||||
def get_dispatch_registry() -> dict[int, DispatchRecord]:
|
||||
"""Return a copy of the current dispatch registry."""
|
||||
return dict(_registry)
|
||||
|
||||
|
||||
def clear_dispatch_registry() -> None:
|
||||
"""Clear the dispatch registry (mainly for tests)."""
|
||||
_registry.clear()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Gitea helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def _get_or_create_label(
|
||||
client: Any,
|
||||
base_url: str,
|
||||
headers: dict,
|
||||
repo: str,
|
||||
label_name: str,
|
||||
) -> int | None:
|
||||
"""Return the Gitea label ID, creating it if necessary."""
|
||||
labels_url = f"{base_url}/repos/{repo}/labels"
|
||||
try:
|
||||
resp = await client.get(labels_url, headers=headers)
|
||||
if resp.status_code == 200:
|
||||
for lbl in resp.json():
|
||||
if lbl.get("name") == label_name:
|
||||
return lbl["id"]
|
||||
except Exception as exc:
|
||||
logger.warning("_get_or_create_label: list failed — %s", exc)
|
||||
return None
|
||||
|
||||
color = _LABEL_COLORS.get(label_name, "#cccccc")
|
||||
try:
|
||||
resp = await client.post(
|
||||
labels_url,
|
||||
headers={**headers, "Content-Type": "application/json"},
|
||||
json={"name": label_name, "color": color},
|
||||
)
|
||||
if resp.status_code in (200, 201):
|
||||
return resp.json().get("id")
|
||||
except Exception as exc:
|
||||
logger.warning("_get_or_create_label: create failed — %s", exc)
|
||||
|
||||
return None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Dispatch action
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
|
||||
"""Apply dispatch label and post a routing comment on the Gitea issue.
|
||||
|
||||
Gracefully degrades: if Gitea is unavailable the record is still
|
||||
created and returned (with label_applied=False, comment_posted=False).
|
||||
|
||||
Args:
|
||||
issue: A TriagedIssue with a routing decision.
|
||||
|
||||
Returns:
|
||||
DispatchRecord summarising what was done.
|
||||
"""
|
||||
record = DispatchRecord(
|
||||
issue_number=issue.number,
|
||||
issue_title=issue.title,
|
||||
agent=issue.agent_target,
|
||||
rationale=issue.rationale,
|
||||
)
|
||||
|
||||
if issue.agent_target == AgentTarget.TIMMY:
|
||||
# Self-dispatch: no label needed — Timmy will handle directly.
|
||||
logger.info(
|
||||
"dispatch_issue: #%d '%s' → Timmy (self, no label)",
|
||||
issue.number,
|
||||
issue.title[:50],
|
||||
)
|
||||
_registry[issue.number] = record
|
||||
return record
|
||||
|
||||
try:
|
||||
import httpx
|
||||
|
||||
from config import settings
|
||||
except ImportError as exc:
|
||||
logger.warning("dispatch_issue: missing dependency — %s", exc)
|
||||
_registry[issue.number] = record
|
||||
return record
|
||||
|
||||
if not settings.gitea_enabled or not settings.gitea_token:
|
||||
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
|
||||
_registry[issue.number] = record
|
||||
return record
|
||||
|
||||
base_url = f"{settings.gitea_url}/api/v1"
|
||||
repo = settings.gitea_repo
|
||||
headers = {
|
||||
"Authorization": f"token {settings.gitea_token}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
label_name = _LABEL_MAP[issue.agent_target]
|
||||
|
||||
try:
|
||||
async with httpx.AsyncClient(timeout=15) as client:
|
||||
label_id = await _get_or_create_label(
|
||||
client, base_url, headers, repo, label_name
|
||||
)
|
||||
|
||||
# Apply label
|
||||
if label_id is not None:
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
|
||||
headers=headers,
|
||||
json={"labels": [label_id]},
|
||||
)
|
||||
record.label_applied = resp.status_code in (200, 201)
|
||||
|
||||
# Post routing comment
|
||||
agent_name = issue.agent_target.value.capitalize()
|
||||
comment_body = (
|
||||
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
|
||||
f"Priority score: {issue.priority_score} \n"
|
||||
f"Rationale: {issue.rationale} \n"
|
||||
f"Label: `{label_name}`"
|
||||
)
|
||||
resp = await client.post(
|
||||
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
|
||||
headers=headers,
|
||||
json={"body": comment_body},
|
||||
)
|
||||
record.comment_posted = resp.status_code in (200, 201)
|
||||
|
||||
except Exception as exc:
|
||||
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
|
||||
|
||||
_registry[issue.number] = record
|
||||
logger.info(
|
||||
"dispatch_issue: #%d '%s' → %s (label=%s comment=%s)",
|
||||
issue.number,
|
||||
issue.title[:50],
|
||||
issue.agent_target,
|
||||
record.label_applied,
|
||||
record.comment_posted,
|
||||
)
|
||||
return record
|
||||
222
src/timmy/vassal/house_health.py
Normal file
222
src/timmy/vassal/house_health.py
Normal file
@@ -0,0 +1,222 @@
|
||||
"""Vassal Protocol — Hermes house health monitoring.
|
||||
|
||||
Monitors system resources on the M3 Max (Hermes) and Ollama model state.
|
||||
Reports warnings when resources are tight and provides cleanup utilities.
|
||||
|
||||
All I/O is wrapped in asyncio.to_thread() per CLAUDE.md convention.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import shutil
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Thresholds
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
|
||||
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
|
||||
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data models
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class DiskUsage:
|
||||
path: str = "/"
|
||||
total_gb: float = 0.0
|
||||
used_gb: float = 0.0
|
||||
free_gb: float = 0.0
|
||||
percent_used: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class MemoryUsage:
|
||||
total_gb: float = 0.0
|
||||
available_gb: float = 0.0
|
||||
percent_used: float = 0.0
|
||||
|
||||
|
||||
@dataclass
|
||||
class OllamaHealth:
|
||||
reachable: bool = False
|
||||
loaded_models: list[str] = field(default_factory=list)
|
||||
error: str = ""
|
||||
|
||||
|
||||
@dataclass
|
||||
class SystemSnapshot:
|
||||
"""Point-in-time snapshot of Hermes resource usage."""
|
||||
|
||||
disk: DiskUsage = field(default_factory=DiskUsage)
|
||||
memory: MemoryUsage = field(default_factory=MemoryUsage)
|
||||
ollama: OllamaHealth = field(default_factory=OllamaHealth)
|
||||
warnings: list[str] = field(default_factory=list)
|
||||
taken_at: str = field(
|
||||
default_factory=lambda: datetime.now(UTC).isoformat()
|
||||
)
|
||||
|
||||
@property
|
||||
def healthy(self) -> bool:
|
||||
return len(self.warnings) == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Resource probes (sync, run in threads)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _probe_disk(path: str = "/") -> DiskUsage:
|
||||
try:
|
||||
usage = shutil.disk_usage(path)
|
||||
total_gb = usage.total / 1e9
|
||||
used_gb = usage.used / 1e9
|
||||
free_gb = usage.free / 1e9
|
||||
pct = (usage.used / usage.total * 100) if usage.total > 0 else 0.0
|
||||
return DiskUsage(
|
||||
path=path,
|
||||
total_gb=round(total_gb, 2),
|
||||
used_gb=round(used_gb, 2),
|
||||
free_gb=round(free_gb, 2),
|
||||
percent_used=round(pct, 1),
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("_probe_disk: %s", exc)
|
||||
return DiskUsage(path=path)
|
||||
|
||||
|
||||
def _probe_memory() -> MemoryUsage:
|
||||
try:
|
||||
import psutil # optional — gracefully degrade if absent
|
||||
|
||||
vm = psutil.virtual_memory()
|
||||
return MemoryUsage(
|
||||
total_gb=round(vm.total / 1e9, 2),
|
||||
available_gb=round(vm.available / 1e9, 2),
|
||||
percent_used=round(vm.percent, 1),
|
||||
)
|
||||
except ImportError:
|
||||
logger.debug("_probe_memory: psutil not installed — skipping")
|
||||
return MemoryUsage()
|
||||
except Exception as exc:
|
||||
logger.debug("_probe_memory: %s", exc)
|
||||
return MemoryUsage()
|
||||
|
||||
|
||||
def _probe_ollama_sync(ollama_url: str) -> OllamaHealth:
|
||||
"""Synchronous Ollama health probe — run in a thread."""
|
||||
try:
|
||||
import urllib.request
|
||||
import json
|
||||
|
||||
url = ollama_url.rstrip("/") + "/api/tags"
|
||||
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
|
||||
data = json.loads(resp.read())
|
||||
models = [m.get("name", "") for m in data.get("models", [])]
|
||||
return OllamaHealth(reachable=True, loaded_models=models)
|
||||
except Exception as exc:
|
||||
return OllamaHealth(reachable=False, error=str(exc)[:120])
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Public API
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
async def get_system_snapshot() -> SystemSnapshot:
|
||||
"""Collect a non-blocking snapshot of system resources.
|
||||
|
||||
Uses asyncio.to_thread() for all blocking I/O per project convention.
|
||||
|
||||
Returns:
|
||||
SystemSnapshot with disk, memory, and Ollama status.
|
||||
"""
|
||||
from config import settings
|
||||
|
||||
disk, memory, ollama = await asyncio.gather(
|
||||
asyncio.to_thread(_probe_disk, "/"),
|
||||
asyncio.to_thread(_probe_memory),
|
||||
asyncio.to_thread(_probe_ollama_sync, settings.normalized_ollama_url),
|
||||
)
|
||||
|
||||
warnings: list[str] = []
|
||||
|
||||
if disk.percent_used >= _WARN_DISK_PCT:
|
||||
warnings.append(
|
||||
f"Disk {disk.path}: {disk.percent_used:.0f}% used "
|
||||
f"({disk.free_gb:.1f} GB free)"
|
||||
)
|
||||
|
||||
if memory.percent_used >= _WARN_MEM_PCT:
|
||||
warnings.append(
|
||||
f"Memory: {memory.percent_used:.0f}% used "
|
||||
f"({memory.available_gb:.1f} GB available)"
|
||||
)
|
||||
|
||||
if not ollama.reachable:
|
||||
warnings.append(f"Ollama unreachable: {ollama.error}")
|
||||
|
||||
if warnings:
|
||||
logger.warning("House health warnings: %s", "; ".join(warnings))
|
||||
|
||||
return SystemSnapshot(
|
||||
disk=disk,
|
||||
memory=memory,
|
||||
ollama=ollama,
|
||||
warnings=warnings,
|
||||
)
|
||||
|
||||
|
||||
async def cleanup_stale_files(
|
||||
temp_dirs: list[str] | None = None,
|
||||
max_age_days: int = 7,
|
||||
) -> dict[str, Any]:
|
||||
"""Remove files older than *max_age_days* from temp directories.
|
||||
|
||||
Only removes files under safe temp paths (never project source).
|
||||
|
||||
Args:
|
||||
temp_dirs: Directories to scan. Defaults to ``["/tmp/timmy"]``.
|
||||
max_age_days: Age threshold in days.
|
||||
|
||||
Returns:
|
||||
Dict with ``deleted_count`` and ``errors``.
|
||||
"""
|
||||
import time
|
||||
|
||||
dirs = temp_dirs or ["/tmp/timmy"] # noqa: S108
|
||||
cutoff = time.time() - max_age_days * 86400
|
||||
deleted = 0
|
||||
errors: list[str] = []
|
||||
|
||||
def _cleanup() -> None:
|
||||
nonlocal deleted
|
||||
for d in dirs:
|
||||
p = Path(d)
|
||||
if not p.exists():
|
||||
continue
|
||||
for f in p.rglob("*"):
|
||||
if f.is_file():
|
||||
try:
|
||||
if f.stat().st_mtime < cutoff:
|
||||
f.unlink()
|
||||
deleted += 1
|
||||
except Exception as exc:
|
||||
errors.append(str(exc))
|
||||
|
||||
await asyncio.to_thread(_cleanup)
|
||||
logger.info(
|
||||
"cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors)
|
||||
)
|
||||
return {"deleted_count": deleted, "errors": errors}
|
||||
321
src/timmy/vassal/orchestration_loop.py
Normal file
321
src/timmy/vassal/orchestration_loop.py
Normal file
@@ -0,0 +1,321 @@
|
||||
"""Vassal Protocol — main orchestration loop.
|
||||
|
||||
Ties the backlog, dispatch, agent health, and house health modules together
|
||||
into a single ``VassalOrchestrator`` that can run as a background service.
|
||||
|
||||
Each cycle:
|
||||
1. Fetch open Gitea issues
|
||||
2. Triage: score priority + route to agent
|
||||
3. Dispatch: apply labels / post routing comments
|
||||
4. Check agent health: nudge stuck agents
|
||||
5. Check house health: log warnings, trigger cleanup if needed
|
||||
6. Return a VassalCycleRecord summarising the cycle
|
||||
|
||||
Usage::
|
||||
|
||||
from timmy.vassal import vassal_orchestrator
|
||||
|
||||
record = await vassal_orchestrator.run_cycle()
|
||||
status = vassal_orchestrator.get_status()
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import time
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Cycle record
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@dataclass
|
||||
class VassalCycleRecord:
|
||||
"""Summary of one orchestration cycle."""
|
||||
|
||||
cycle_id: int
|
||||
started_at: str
|
||||
finished_at: str = ""
|
||||
duration_ms: int = 0
|
||||
|
||||
issues_fetched: int = 0
|
||||
issues_dispatched: int = 0
|
||||
dispatched_to_claude: int = 0
|
||||
dispatched_to_kimi: int = 0
|
||||
dispatched_to_timmy: int = 0
|
||||
|
||||
stuck_agents: list[str] = field(default_factory=list)
|
||||
nudges_sent: int = 0
|
||||
|
||||
house_warnings: list[str] = field(default_factory=list)
|
||||
cleanup_deleted: int = 0
|
||||
|
||||
errors: list[str] = field(default_factory=list)
|
||||
|
||||
@property
|
||||
def healthy(self) -> bool:
|
||||
return not self.errors and not self.house_warnings
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Orchestrator
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
class VassalOrchestrator:
|
||||
"""Timmy's autonomous orchestration engine.
|
||||
|
||||
Runs observe → triage → dispatch → monitor → house-check cycles on a
|
||||
configurable interval.
|
||||
|
||||
Parameters
|
||||
----------
|
||||
cycle_interval:
|
||||
Seconds between cycles. Defaults to ``settings.vassal_cycle_interval``
|
||||
when available, otherwise 300 s (5 min).
|
||||
max_dispatch_per_cycle:
|
||||
Cap on new dispatches per cycle to avoid spamming agents.
|
||||
"""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
cycle_interval: float | None = None,
|
||||
max_dispatch_per_cycle: int = 10,
|
||||
) -> None:
|
||||
self._cycle_count = 0
|
||||
self._running = False
|
||||
self._task: asyncio.Task | None = None
|
||||
self._max_dispatch = max_dispatch_per_cycle
|
||||
self._history: list[VassalCycleRecord] = []
|
||||
|
||||
# Resolve interval — lazy to avoid import-time settings read
|
||||
self._cycle_interval = cycle_interval
|
||||
|
||||
# -- public API --------------------------------------------------------
|
||||
|
||||
@property
|
||||
def cycle_count(self) -> int:
|
||||
return self._cycle_count
|
||||
|
||||
@property
|
||||
def is_running(self) -> bool:
|
||||
return self._running
|
||||
|
||||
@property
|
||||
def history(self) -> list[VassalCycleRecord]:
|
||||
return list(self._history)
|
||||
|
||||
def get_status(self) -> dict[str, Any]:
|
||||
"""Return a JSON-serialisable status dict."""
|
||||
last = self._history[-1] if self._history else None
|
||||
return {
|
||||
"running": self._running,
|
||||
"cycle_count": self._cycle_count,
|
||||
"last_cycle": {
|
||||
"cycle_id": last.cycle_id,
|
||||
"started_at": last.started_at,
|
||||
"issues_fetched": last.issues_fetched,
|
||||
"issues_dispatched": last.issues_dispatched,
|
||||
"stuck_agents": last.stuck_agents,
|
||||
"house_warnings": last.house_warnings,
|
||||
"healthy": last.healthy,
|
||||
}
|
||||
if last
|
||||
else None,
|
||||
}
|
||||
|
||||
# -- single cycle ------------------------------------------------------
|
||||
|
||||
async def run_cycle(self) -> VassalCycleRecord:
|
||||
"""Execute one full orchestration cycle.
|
||||
|
||||
Gracefully degrades at each step — a failure in one sub-task does
|
||||
not abort the rest of the cycle.
|
||||
|
||||
Returns:
|
||||
VassalCycleRecord summarising what happened.
|
||||
"""
|
||||
self._cycle_count += 1
|
||||
start = time.monotonic()
|
||||
record = VassalCycleRecord(
|
||||
cycle_id=self._cycle_count,
|
||||
started_at=datetime.now(UTC).isoformat(),
|
||||
)
|
||||
|
||||
# 1 + 2: Fetch & triage
|
||||
await self._step_backlog(record)
|
||||
|
||||
# 3: Agent health
|
||||
await self._step_agent_health(record)
|
||||
|
||||
# 4: House health
|
||||
await self._step_house_health(record)
|
||||
|
||||
# Finalise record
|
||||
record.finished_at = datetime.now(UTC).isoformat()
|
||||
record.duration_ms = int((time.monotonic() - start) * 1000)
|
||||
self._history.append(record)
|
||||
|
||||
# Broadcast via WebSocket (best-effort)
|
||||
await self._broadcast(record)
|
||||
|
||||
logger.info(
|
||||
"VassalOrchestrator cycle #%d complete (%d ms): "
|
||||
"fetched=%d dispatched=%d stuck=%s house_ok=%s",
|
||||
record.cycle_id,
|
||||
record.duration_ms,
|
||||
record.issues_fetched,
|
||||
record.issues_dispatched,
|
||||
record.stuck_agents or "none",
|
||||
not record.house_warnings,
|
||||
)
|
||||
return record
|
||||
|
||||
# -- background loop ---------------------------------------------------
|
||||
|
||||
async def start(self) -> None:
|
||||
"""Start the recurring orchestration loop as a background task."""
|
||||
if self._running:
|
||||
logger.warning("VassalOrchestrator already running")
|
||||
return
|
||||
self._running = True
|
||||
self._task = asyncio.ensure_future(self._loop())
|
||||
|
||||
def stop(self) -> None:
|
||||
"""Signal the loop to stop after the current cycle."""
|
||||
self._running = False
|
||||
if self._task and not self._task.done():
|
||||
self._task.cancel()
|
||||
logger.info("VassalOrchestrator stop requested")
|
||||
|
||||
async def _loop(self) -> None:
|
||||
interval = self._resolve_interval()
|
||||
logger.info("VassalOrchestrator loop started (interval=%.0fs)", interval)
|
||||
while self._running:
|
||||
try:
|
||||
await self.run_cycle()
|
||||
except Exception:
|
||||
logger.exception("VassalOrchestrator cycle failed")
|
||||
await asyncio.sleep(interval)
|
||||
|
||||
# -- step: backlog -------------------------------------------------------
|
||||
|
||||
async def _step_backlog(self, record: VassalCycleRecord) -> None:
|
||||
from timmy.vassal.backlog import fetch_open_issues, triage_issues
|
||||
from timmy.vassal.dispatch import dispatch_issue, get_dispatch_registry
|
||||
|
||||
try:
|
||||
raw_issues = await fetch_open_issues(
|
||||
limit=50,
|
||||
exclude_labels=["wip", "blocked", "needs-info"],
|
||||
)
|
||||
record.issues_fetched = len(raw_issues)
|
||||
|
||||
if not raw_issues:
|
||||
return
|
||||
|
||||
triaged = triage_issues(raw_issues)
|
||||
registry = get_dispatch_registry()
|
||||
|
||||
dispatched = 0
|
||||
for issue in triaged:
|
||||
if dispatched >= self._max_dispatch:
|
||||
break
|
||||
# Skip already-dispatched issues
|
||||
if issue.number in registry:
|
||||
continue
|
||||
await dispatch_issue(issue)
|
||||
dispatched += 1
|
||||
|
||||
from timmy.vassal.backlog import AgentTarget
|
||||
|
||||
if issue.agent_target == AgentTarget.CLAUDE:
|
||||
record.dispatched_to_claude += 1
|
||||
elif issue.agent_target == AgentTarget.KIMI:
|
||||
record.dispatched_to_kimi += 1
|
||||
else:
|
||||
record.dispatched_to_timmy += 1
|
||||
|
||||
record.issues_dispatched = dispatched
|
||||
|
||||
except Exception as exc:
|
||||
logger.exception("_step_backlog failed")
|
||||
record.errors.append(f"backlog: {exc}")
|
||||
|
||||
# -- step: agent health -------------------------------------------------
|
||||
|
||||
async def _step_agent_health(self, record: VassalCycleRecord) -> None:
|
||||
from config import settings
|
||||
from timmy.vassal.agent_health import get_full_health_report, nudge_stuck_agent
|
||||
|
||||
try:
|
||||
threshold = getattr(settings, "vassal_stuck_threshold_minutes", 120)
|
||||
report = await get_full_health_report(stuck_threshold_minutes=threshold)
|
||||
|
||||
for agent_status in report.agents:
|
||||
if agent_status.is_stuck:
|
||||
record.stuck_agents.append(agent_status.agent)
|
||||
for issue_num in agent_status.stuck_issue_numbers:
|
||||
ok = await nudge_stuck_agent(agent_status.agent, issue_num)
|
||||
if ok:
|
||||
record.nudges_sent += 1
|
||||
|
||||
except Exception as exc:
|
||||
logger.exception("_step_agent_health failed")
|
||||
record.errors.append(f"agent_health: {exc}")
|
||||
|
||||
# -- step: house health -------------------------------------------------
|
||||
|
||||
async def _step_house_health(self, record: VassalCycleRecord) -> None:
|
||||
from timmy.vassal.house_health import cleanup_stale_files, get_system_snapshot
|
||||
|
||||
try:
|
||||
snapshot = await get_system_snapshot()
|
||||
record.house_warnings = snapshot.warnings
|
||||
|
||||
# Auto-cleanup temp files when disk is getting tight
|
||||
if snapshot.disk.percent_used >= 80.0:
|
||||
result = await cleanup_stale_files(max_age_days=3)
|
||||
record.cleanup_deleted = result.get("deleted_count", 0)
|
||||
|
||||
except Exception as exc:
|
||||
logger.exception("_step_house_health failed")
|
||||
record.errors.append(f"house_health: {exc}")
|
||||
|
||||
# -- helpers ------------------------------------------------------------
|
||||
|
||||
def _resolve_interval(self) -> float:
|
||||
if self._cycle_interval is not None:
|
||||
return self._cycle_interval
|
||||
try:
|
||||
from config import settings
|
||||
|
||||
return float(getattr(settings, "vassal_cycle_interval", 300))
|
||||
except Exception:
|
||||
return 300.0
|
||||
|
||||
async def _broadcast(self, record: VassalCycleRecord) -> None:
|
||||
try:
|
||||
from infrastructure.ws_manager.handler import ws_manager
|
||||
|
||||
await ws_manager.broadcast(
|
||||
"vassal.cycle",
|
||||
{
|
||||
"cycle_id": record.cycle_id,
|
||||
"started_at": record.started_at,
|
||||
"issues_fetched": record.issues_fetched,
|
||||
"issues_dispatched": record.issues_dispatched,
|
||||
"stuck_agents": record.stuck_agents,
|
||||
"house_warnings": record.house_warnings,
|
||||
"duration_ms": record.duration_ms,
|
||||
"healthy": record.healthy,
|
||||
},
|
||||
)
|
||||
except Exception as exc:
|
||||
logger.debug("VassalOrchestrator broadcast skipped: %s", exc)
|
||||
503
tests/timmy/test_dispatcher.py
Normal file
503
tests/timmy/test_dispatcher.py
Normal file
@@ -0,0 +1,503 @@
|
||||
"""Tests for the agent dispatcher (timmy.dispatcher)."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.dispatcher import (
|
||||
AGENT_REGISTRY,
|
||||
AgentType,
|
||||
DispatchResult,
|
||||
DispatchStatus,
|
||||
TaskType,
|
||||
_dispatch_local,
|
||||
_dispatch_via_api,
|
||||
_dispatch_via_gitea,
|
||||
dispatch_task,
|
||||
infer_task_type,
|
||||
select_agent,
|
||||
wait_for_completion,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Agent registry
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestAgentRegistry:
|
||||
def test_all_agents_present(self):
|
||||
for member in AgentType:
|
||||
assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry"
|
||||
|
||||
def test_agent_specs_have_display_names(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
assert spec.display_name, f"{agent} has empty display_name"
|
||||
|
||||
def test_gitea_agents_have_labels(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
if spec.interface == "gitea":
|
||||
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
|
||||
|
||||
def test_non_gitea_agents_have_no_labels(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
if spec.interface not in ("gitea",):
|
||||
# api and local agents may have no label
|
||||
assert spec.gitea_label is None or spec.interface == "gitea"
|
||||
|
||||
def test_max_concurrent_positive(self):
|
||||
for agent, spec in AGENT_REGISTRY.items():
|
||||
assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# select_agent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestSelectAgent:
|
||||
def test_architecture_routes_to_claude(self):
|
||||
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_refactoring_routes_to_claude(self):
|
||||
assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_code_review_routes_to_claude(self):
|
||||
assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE
|
||||
|
||||
def test_routine_coding_routes_to_kimi(self):
|
||||
assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE
|
||||
|
||||
def test_fast_iteration_routes_to_kimi(self):
|
||||
assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE
|
||||
|
||||
def test_research_routes_to_agent_api(self):
|
||||
assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API
|
||||
|
||||
def test_triage_routes_to_timmy(self):
|
||||
assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY
|
||||
|
||||
def test_planning_routes_to_timmy(self):
|
||||
assert select_agent(TaskType.PLANNING) == AgentType.TIMMY
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# infer_task_type
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestInferTaskType:
|
||||
def test_architecture_keyword(self):
|
||||
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
|
||||
|
||||
def test_refactor_keyword(self):
|
||||
assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING
|
||||
|
||||
def test_code_review_keyword(self):
|
||||
assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW
|
||||
|
||||
def test_research_keyword(self):
|
||||
assert infer_task_type("Research embedding models") == TaskType.RESEARCH
|
||||
|
||||
def test_triage_keyword(self):
|
||||
assert infer_task_type("Triage open issues") == TaskType.TRIAGE
|
||||
|
||||
def test_planning_keyword(self):
|
||||
assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING
|
||||
|
||||
def test_fallback_returns_routine_coding(self):
|
||||
assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING
|
||||
|
||||
def test_description_contributes_to_inference(self):
|
||||
result = infer_task_type("Implement feature", "We need to refactor the old code")
|
||||
assert result == TaskType.REFACTORING
|
||||
|
||||
def test_case_insensitive(self):
|
||||
assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DispatchResult
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchResult:
|
||||
def test_success_when_assigned(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
)
|
||||
assert r.success is True
|
||||
|
||||
def test_success_when_completed(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.COMPLETED,
|
||||
)
|
||||
assert r.success is True
|
||||
|
||||
def test_not_success_when_failed(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.FAILED,
|
||||
)
|
||||
assert r.success is False
|
||||
|
||||
def test_not_success_when_escalated(self):
|
||||
r = DispatchResult(
|
||||
task_type=TaskType.ROUTINE_CODING,
|
||||
agent=AgentType.KIMI_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ESCALATED,
|
||||
)
|
||||
assert r.success is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_local
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchLocal:
|
||||
async def test_returns_assigned(self):
|
||||
result = await _dispatch_local(
|
||||
title="Plan the migration",
|
||||
description="We need a plan.",
|
||||
acceptance_criteria=["Plan is documented"],
|
||||
issue_number=42,
|
||||
)
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
assert result.agent == AgentType.TIMMY
|
||||
assert result.issue_number == 42
|
||||
|
||||
async def test_infers_task_type(self):
|
||||
result = await _dispatch_local(
|
||||
title="Plan the sprint",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.task_type == TaskType.PLANNING
|
||||
|
||||
async def test_no_issue_number(self):
|
||||
result = await _dispatch_local(title="Do something", description="")
|
||||
assert result.issue_number is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_via_api
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchViaApi:
|
||||
async def test_no_endpoint_returns_failed(self):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "No API endpoint" in (result.error or "")
|
||||
|
||||
async def test_successful_api_call(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 202
|
||||
mock_resp.content = b'{"ok": true}'
|
||||
mock_resp.json.return_value = {"ok": True}
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_client):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="Look at the logs",
|
||||
acceptance_criteria=["Report produced"],
|
||||
endpoint="http://fake-agent/dispatch",
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
assert result.agent == AgentType.AGENT_API
|
||||
|
||||
async def test_api_error_returns_failed(self):
|
||||
mock_resp = MagicMock()
|
||||
mock_resp.status_code = 500
|
||||
mock_resp.text = "Internal Server Error"
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
|
||||
mock_client.__aexit__ = AsyncMock(return_value=False)
|
||||
mock_client.post = AsyncMock(return_value=mock_resp)
|
||||
|
||||
with patch("httpx.AsyncClient", return_value=mock_client):
|
||||
result = await _dispatch_via_api(
|
||||
agent=AgentType.AGENT_API,
|
||||
title="Analyse logs",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
endpoint="http://fake-agent/dispatch",
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "500" in (result.error or "")
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _dispatch_via_gitea
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
_GITEA_SETTINGS = MagicMock(
|
||||
gitea_enabled=True,
|
||||
gitea_token="test-token",
|
||||
gitea_url="http://gitea.test",
|
||||
gitea_repo="owner/repo",
|
||||
)
|
||||
|
||||
|
||||
class TestDispatchViaGitea:
|
||||
def _make_client(self, label_list=None, label_create_status=201, comment_status=201):
|
||||
"""Build a mock httpx.AsyncClient for Gitea interactions."""
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = label_list or []
|
||||
|
||||
create_label_resp = MagicMock()
|
||||
create_label_resp.status_code = label_create_status
|
||||
create_label_resp.json.return_value = {"id": 99}
|
||||
|
||||
apply_label_resp = MagicMock()
|
||||
apply_label_resp.status_code = 201
|
||||
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = comment_status
|
||||
comment_resp.json.return_value = {"id": 7}
|
||||
|
||||
client = AsyncMock()
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=False)
|
||||
client.get = AsyncMock(return_value=label_resp)
|
||||
client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp])
|
||||
return client
|
||||
|
||||
async def test_successful_gitea_dispatch(self):
|
||||
client = self._make_client()
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1072,
|
||||
title="Design the router",
|
||||
description="We need a cascade router.",
|
||||
acceptance_criteria=["Failover works"],
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert result.agent == AgentType.CLAUDE_CODE
|
||||
assert result.issue_number == 1072
|
||||
assert result.status == DispatchStatus.ASSIGNED
|
||||
|
||||
async def test_no_gitea_token_returns_failed(self):
|
||||
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
|
||||
with patch("timmy.dispatcher.settings", bad_settings):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
title="Some task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "not configured" in (result.error or "").lower()
|
||||
|
||||
async def test_gitea_disabled_returns_failed(self):
|
||||
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
|
||||
with patch("timmy.dispatcher.settings", bad_settings):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
title="Some task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
|
||||
async def test_existing_label_reused(self):
|
||||
"""When the label already exists, it should be reused (no creation call)."""
|
||||
label_resp = MagicMock()
|
||||
label_resp.status_code = 200
|
||||
label_resp.json.return_value = [{"name": "claude-ready", "id": 55}]
|
||||
|
||||
apply_resp = MagicMock()
|
||||
apply_resp.status_code = 201
|
||||
|
||||
comment_resp = MagicMock()
|
||||
comment_resp.status_code = 201
|
||||
comment_resp.json.return_value = {"id": 8}
|
||||
|
||||
client = AsyncMock()
|
||||
client.__aenter__ = AsyncMock(return_value=client)
|
||||
client.__aexit__ = AsyncMock(return_value=False)
|
||||
client.get = AsyncMock(return_value=label_resp)
|
||||
client.post = AsyncMock(side_effect=[apply_resp, comment_resp])
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await _dispatch_via_gitea(
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=10,
|
||||
title="Architecture task",
|
||||
description="",
|
||||
acceptance_criteria=[],
|
||||
)
|
||||
|
||||
assert result.success
|
||||
# Should only have 2 POST calls: apply label + comment (no label creation)
|
||||
assert client.post.call_count == 2
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dispatch_task (integration-style)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestDispatchTask:
|
||||
async def test_empty_title_returns_failed(self):
|
||||
result = await dispatch_task(title=" ")
|
||||
assert result.status == DispatchStatus.FAILED
|
||||
assert "`title` is required" in (result.error or "")
|
||||
|
||||
async def test_local_dispatch_for_timmy_task(self):
|
||||
result = await dispatch_task(
|
||||
title="Triage the open issues",
|
||||
description="We have 40 open issues.",
|
||||
acceptance_criteria=["Issues are labelled"],
|
||||
task_type=TaskType.TRIAGE,
|
||||
)
|
||||
assert result.agent == AgentType.TIMMY
|
||||
assert result.success
|
||||
|
||||
async def test_explicit_agent_override(self):
|
||||
"""Caller can force a specific agent regardless of task type."""
|
||||
result = await dispatch_task(
|
||||
title="Triage the open issues",
|
||||
agent=AgentType.TIMMY,
|
||||
)
|
||||
assert result.agent == AgentType.TIMMY
|
||||
|
||||
async def test_gitea_dispatch_when_issue_provided(self):
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
|
||||
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
|
||||
apply_resp = MagicMock(status_code=201)
|
||||
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
|
||||
client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp])
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
result = await dispatch_task(
|
||||
title="Design the cascade router",
|
||||
description="Architecture task.",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1072,
|
||||
)
|
||||
|
||||
assert result.agent == AgentType.CLAUDE_CODE
|
||||
assert result.success
|
||||
|
||||
async def test_escalation_after_max_retries(self):
|
||||
"""If all attempts fail, the result is ESCALATED."""
|
||||
with (
|
||||
patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch,
|
||||
patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock),
|
||||
):
|
||||
mock_dispatch.return_value = DispatchResult(
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.FAILED,
|
||||
error="Gitea offline",
|
||||
)
|
||||
result = await dispatch_task(
|
||||
title="Design router",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1,
|
||||
max_retries=1,
|
||||
)
|
||||
|
||||
assert result.status == DispatchStatus.ESCALATED
|
||||
assert mock_dispatch.call_count == 2 # initial + 1 retry
|
||||
|
||||
async def test_no_retry_on_success(self):
|
||||
with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch:
|
||||
mock_dispatch.return_value = DispatchResult(
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
agent=AgentType.CLAUDE_CODE,
|
||||
issue_number=1,
|
||||
status=DispatchStatus.ASSIGNED,
|
||||
comment_id=42,
|
||||
label_applied="claude-ready",
|
||||
)
|
||||
result = await dispatch_task(
|
||||
title="Design router",
|
||||
task_type=TaskType.ARCHITECTURE,
|
||||
issue_number=1,
|
||||
max_retries=2,
|
||||
)
|
||||
|
||||
assert result.success
|
||||
assert mock_dispatch.call_count == 1 # no retries needed
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# wait_for_completion
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
class TestWaitForCompletion:
|
||||
async def test_returns_completed_when_issue_closed(self):
|
||||
closed_resp = MagicMock(
|
||||
status_code=200,
|
||||
json=MagicMock(return_value={"state": "closed"}),
|
||||
)
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=closed_resp)
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
):
|
||||
status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5)
|
||||
|
||||
assert status == DispatchStatus.COMPLETED
|
||||
|
||||
async def test_returns_timed_out_when_still_open(self):
|
||||
open_resp = MagicMock(
|
||||
status_code=200,
|
||||
json=MagicMock(return_value={"state": "open"}),
|
||||
)
|
||||
client_mock = AsyncMock()
|
||||
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
|
||||
client_mock.__aexit__ = AsyncMock(return_value=False)
|
||||
client_mock.get = AsyncMock(return_value=open_resp)
|
||||
|
||||
with (
|
||||
patch("httpx.AsyncClient", return_value=client_mock),
|
||||
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
|
||||
patch("asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2)
|
||||
|
||||
assert status == DispatchStatus.TIMED_OUT
|
||||
621
tests/unit/test_backlog_triage.py
Normal file
621
tests/unit/test_backlog_triage.py
Normal file
@@ -0,0 +1,621 @@
|
||||
"""Unit tests for timmy.backlog_triage — autonomous backlog triage loop."""
|
||||
|
||||
from datetime import UTC, datetime
|
||||
from unittest.mock import AsyncMock, MagicMock, patch
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.backlog_triage import (
|
||||
AGENT_CLAUDE,
|
||||
AGENT_KIMI,
|
||||
KIMI_READY_LABEL,
|
||||
OWNER_LOGIN,
|
||||
READY_THRESHOLD,
|
||||
BacklogTriageLoop,
|
||||
ScoredIssue,
|
||||
TriageCycleResult,
|
||||
TriageDecision,
|
||||
_build_audit_comment,
|
||||
_build_daily_summary,
|
||||
_extract_tags,
|
||||
_score_acceptance,
|
||||
_score_alignment,
|
||||
_score_scope,
|
||||
decide,
|
||||
score_issue,
|
||||
)
|
||||
|
||||
|
||||
# ── Fixtures ─────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
def _make_raw_issue(
|
||||
number: int = 1,
|
||||
title: str = "Fix the login bug",
|
||||
body: str = "## Problem\nLogin fails on empty password.\n\n## Steps\nassert response == 200",
|
||||
labels: list | None = None,
|
||||
assignees: list | None = None,
|
||||
created_at: str = "2026-03-20T10:00:00Z",
|
||||
) -> dict:
|
||||
return {
|
||||
"number": number,
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [{"name": lbl} for lbl in (labels or [])],
|
||||
"assignees": [{"login": a} for a in (assignees or [])],
|
||||
"created_at": created_at,
|
||||
}
|
||||
|
||||
|
||||
def _make_scored_issue(
|
||||
number: int = 1,
|
||||
title: str = "Fix login bug",
|
||||
issue_type: str = "bug",
|
||||
score: int = 7,
|
||||
ready: bool = True,
|
||||
is_p0: bool = True,
|
||||
is_blocked: bool = False,
|
||||
assignees: list | None = None,
|
||||
tags: set | None = None,
|
||||
labels: list | None = None,
|
||||
age_days: int = 3,
|
||||
) -> ScoredIssue:
|
||||
return ScoredIssue(
|
||||
number=number,
|
||||
title=title,
|
||||
body="",
|
||||
labels=labels or [],
|
||||
tags=tags or {"bug"},
|
||||
assignees=assignees or [],
|
||||
created_at=datetime.now(UTC),
|
||||
issue_type=issue_type,
|
||||
score=score,
|
||||
scope=2,
|
||||
acceptance=2,
|
||||
alignment=3,
|
||||
ready=ready,
|
||||
age_days=age_days,
|
||||
is_p0=is_p0,
|
||||
is_blocked=is_blocked,
|
||||
)
|
||||
|
||||
|
||||
# ── _extract_tags ─────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestExtractTags:
|
||||
def test_bracket_tags_in_title(self):
|
||||
tags = _extract_tags("[Bug] Login fails", [])
|
||||
assert "bug" in tags
|
||||
|
||||
def test_multiple_brackets(self):
|
||||
tags = _extract_tags("[Bug][P0] Crash on startup", [])
|
||||
assert "bug" in tags
|
||||
assert "p0" in tags
|
||||
|
||||
def test_label_names(self):
|
||||
tags = _extract_tags("Fix thing", ["security", "hotfix"])
|
||||
assert "security" in tags
|
||||
assert "hotfix" in tags
|
||||
|
||||
def test_labels_lowercased(self):
|
||||
tags = _extract_tags("Title", ["Bug", "FEATURE"])
|
||||
assert "bug" in tags
|
||||
assert "feature" in tags
|
||||
|
||||
def test_empty_inputs(self):
|
||||
tags = _extract_tags("", [])
|
||||
assert tags == set()
|
||||
|
||||
|
||||
# ── Scoring functions ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestScoreScope:
|
||||
def test_file_reference_adds_point(self):
|
||||
score = _score_scope("Fix auth", "Edit src/timmy/auth.py", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_function_reference_adds_point(self):
|
||||
score = _score_scope("Fix auth", "def validate_token()", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_short_title_adds_point(self):
|
||||
score = _score_scope("Short title", "", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_meta_tag_penalizes(self):
|
||||
score = _score_scope("Discussion about philosophy", "long body " * 5, {"philosophy"})
|
||||
assert score <= 1
|
||||
|
||||
def test_max_score_3(self):
|
||||
score = _score_scope("Fix auth", "src/auth.py\ndef login()", set())
|
||||
assert score <= 3
|
||||
|
||||
|
||||
class TestScoreAcceptance:
|
||||
def test_acceptance_keywords(self):
|
||||
body = "should return 200\nmust pass tests\nexpect response"
|
||||
score = _score_acceptance("Title", body, set())
|
||||
assert score >= 2
|
||||
|
||||
def test_test_reference_adds_point(self):
|
||||
score = _score_acceptance("Title", "Run tox -e unit", set())
|
||||
assert score >= 1
|
||||
|
||||
def test_structured_sections(self):
|
||||
body = "## Problem\nX\n## Solution\nY"
|
||||
score = _score_acceptance("Title", body, set())
|
||||
assert score >= 1
|
||||
|
||||
def test_meta_tag_penalizes(self):
|
||||
score = _score_acceptance("Title", "should do something", {"philosophy"})
|
||||
# still counts but penalized
|
||||
assert score <= 2
|
||||
|
||||
def test_empty_body(self):
|
||||
score = _score_acceptance("Title", "", set())
|
||||
assert score == 0
|
||||
|
||||
|
||||
class TestScoreAlignment:
|
||||
def test_bug_tags_score_max(self):
|
||||
assert _score_alignment("", "", {"bug"}) == 3
|
||||
|
||||
def test_hotfix_tag_max(self):
|
||||
assert _score_alignment("", "", {"hotfix"}) == 3
|
||||
|
||||
def test_refactor_tag(self):
|
||||
score = _score_alignment("", "", {"refactor"})
|
||||
assert score >= 2
|
||||
|
||||
def test_feature_tag(self):
|
||||
score = _score_alignment("", "", {"feature"})
|
||||
assert score >= 2
|
||||
|
||||
def test_meta_tags_zero(self):
|
||||
assert _score_alignment("", "", {"philosophy"}) == 0
|
||||
|
||||
def test_loop_generated_bonus(self):
|
||||
score = _score_alignment("", "", {"loop-generated"})
|
||||
assert score >= 1
|
||||
|
||||
|
||||
# ── score_issue ───────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestScoreIssue:
|
||||
def test_bug_issue_classified_correctly(self):
|
||||
raw = _make_raw_issue(labels=["bug"], title="[Bug] Crash on startup")
|
||||
scored = score_issue(raw)
|
||||
assert scored.issue_type == "bug"
|
||||
assert scored.is_p0 is True
|
||||
|
||||
def test_feature_issue_classified(self):
|
||||
raw = _make_raw_issue(labels=["feature"], title="Add voice support")
|
||||
scored = score_issue(raw)
|
||||
assert scored.issue_type == "feature"
|
||||
|
||||
def test_philosophy_issue_classified(self):
|
||||
raw = _make_raw_issue(labels=["philosophy"], title="[Philosophy] Should Timmy sleep?")
|
||||
scored = score_issue(raw)
|
||||
assert scored.issue_type == "philosophy"
|
||||
|
||||
def test_research_issue_classified(self):
|
||||
raw = _make_raw_issue(labels=["research"], title="Investigate model options")
|
||||
scored = score_issue(raw)
|
||||
assert scored.issue_type == "research"
|
||||
|
||||
def test_ready_flag_set_when_score_high(self):
|
||||
body = (
|
||||
"## Problem\nX breaks.\n## Solution\nFix src/timmy/agent.py def run()\n"
|
||||
"should return True\nmust pass tox -e unit"
|
||||
)
|
||||
raw = _make_raw_issue(labels=["bug"], body=body)
|
||||
scored = score_issue(raw)
|
||||
assert scored.score >= READY_THRESHOLD
|
||||
assert scored.ready is True
|
||||
|
||||
def test_is_blocked_detected_in_body(self):
|
||||
raw = _make_raw_issue(body="This is blocked by issue #50")
|
||||
scored = score_issue(raw)
|
||||
assert scored.is_blocked is True
|
||||
|
||||
def test_is_blocked_detected_in_title(self):
|
||||
raw = _make_raw_issue(title="[blocking] Cannot proceed")
|
||||
scored = score_issue(raw)
|
||||
# "blocking" in brackets becomes a tag
|
||||
assert scored.is_blocked is True
|
||||
|
||||
def test_unassigned_when_no_assignees(self):
|
||||
raw = _make_raw_issue(assignees=[])
|
||||
scored = score_issue(raw)
|
||||
assert scored.is_unassigned is True
|
||||
|
||||
def test_assigned_when_has_assignee(self):
|
||||
raw = _make_raw_issue(assignees=["claude"])
|
||||
scored = score_issue(raw)
|
||||
assert scored.is_unassigned is False
|
||||
|
||||
def test_age_days_computed(self):
|
||||
old_ts = "2026-01-01T00:00:00Z"
|
||||
raw = _make_raw_issue(created_at=old_ts)
|
||||
scored = score_issue(raw)
|
||||
assert scored.age_days > 0
|
||||
|
||||
def test_needs_kimi_for_research_label(self):
|
||||
raw = _make_raw_issue(labels=["kimi-ready"])
|
||||
scored = score_issue(raw)
|
||||
assert scored.needs_kimi is True
|
||||
|
||||
|
||||
# ── decide ────────────────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestDecide:
|
||||
def test_philosophy_skipped(self):
|
||||
issue = _make_scored_issue(issue_type="philosophy", tags={"philosophy"})
|
||||
d = decide(issue)
|
||||
assert d.action == "skip"
|
||||
assert "philosophy" in d.reason.lower()
|
||||
|
||||
def test_assigned_issue_skipped(self):
|
||||
issue = _make_scored_issue(assignees=["perplexity"])
|
||||
d = decide(issue)
|
||||
assert d.action == "skip"
|
||||
assert "assigned" in d.reason.lower()
|
||||
|
||||
def test_low_score_skipped(self):
|
||||
issue = _make_scored_issue(score=2, ready=False)
|
||||
d = decide(issue)
|
||||
assert d.action == "skip"
|
||||
assert "threshold" in d.reason.lower()
|
||||
|
||||
def test_blocked_issue_flagged_for_alex(self):
|
||||
issue = _make_scored_issue(is_blocked=True)
|
||||
d = decide(issue)
|
||||
assert d.action == "flag_alex"
|
||||
assert d.agent == OWNER_LOGIN
|
||||
|
||||
def test_research_issue_assigned_kimi(self):
|
||||
issue = _make_scored_issue(
|
||||
issue_type="research",
|
||||
tags={"research"},
|
||||
is_p0=False,
|
||||
is_blocked=False,
|
||||
)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_kimi"
|
||||
assert d.agent == AGENT_KIMI
|
||||
|
||||
def test_kimi_ready_label_assigns_kimi(self):
|
||||
issue = _make_scored_issue(
|
||||
issue_type="unknown",
|
||||
tags={"kimi-ready"},
|
||||
labels=["kimi-ready"],
|
||||
is_p0=False,
|
||||
is_blocked=False,
|
||||
)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_kimi"
|
||||
|
||||
def test_p0_bug_assigns_claude(self):
|
||||
issue = _make_scored_issue(issue_type="bug", is_p0=True, is_blocked=False)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_claude"
|
||||
assert d.agent == AGENT_CLAUDE
|
||||
|
||||
def test_ready_feature_assigns_claude(self):
|
||||
issue = _make_scored_issue(
|
||||
issue_type="feature",
|
||||
is_p0=False,
|
||||
is_blocked=False,
|
||||
tags={"feature"},
|
||||
)
|
||||
d = decide(issue)
|
||||
assert d.action == "assign_claude"
|
||||
assert d.agent == AGENT_CLAUDE
|
||||
|
||||
def test_decision_has_reason(self):
|
||||
issue = _make_scored_issue()
|
||||
d = decide(issue)
|
||||
assert len(d.reason) > 10
|
||||
|
||||
|
||||
# ── _build_audit_comment ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestBuildAuditComment:
|
||||
def test_contains_timmy_triage_header(self):
|
||||
d = TriageDecision(42, "assign_claude", "High priority bug", agent=AGENT_CLAUDE)
|
||||
comment = _build_audit_comment(d)
|
||||
assert "Timmy Triage" in comment
|
||||
|
||||
def test_contains_issue_reason(self):
|
||||
d = TriageDecision(42, "assign_claude", "Urgent P0 bug", agent=AGENT_CLAUDE)
|
||||
comment = _build_audit_comment(d)
|
||||
assert "Urgent P0 bug" in comment
|
||||
|
||||
def test_assign_claude_mentions_agent(self):
|
||||
d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE)
|
||||
comment = _build_audit_comment(d)
|
||||
assert AGENT_CLAUDE in comment
|
||||
|
||||
def test_assign_kimi_mentions_label(self):
|
||||
d = TriageDecision(42, "assign_kimi", "reason", agent=AGENT_KIMI)
|
||||
comment = _build_audit_comment(d)
|
||||
assert KIMI_READY_LABEL in comment
|
||||
|
||||
def test_flag_alex_mentions_owner(self):
|
||||
d = TriageDecision(42, "flag_alex", "blocked", agent=OWNER_LOGIN)
|
||||
comment = _build_audit_comment(d)
|
||||
assert OWNER_LOGIN in comment
|
||||
|
||||
def test_contains_override_note(self):
|
||||
d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE)
|
||||
comment = _build_audit_comment(d)
|
||||
assert "override" in comment.lower()
|
||||
|
||||
|
||||
# ── _build_daily_summary ──────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestBuildDailySummary:
|
||||
def _make_result(self, decisions=None) -> TriageCycleResult:
|
||||
return TriageCycleResult(
|
||||
timestamp=datetime.now(UTC).isoformat(),
|
||||
total_open=10,
|
||||
scored=8,
|
||||
ready=5,
|
||||
decisions=decisions or [],
|
||||
)
|
||||
|
||||
def test_contains_open_count(self):
|
||||
result = self._make_result()
|
||||
scored = [_make_scored_issue(number=i, ready=True, score=6) for i in range(1, 4)]
|
||||
summary = _build_daily_summary(result, scored)
|
||||
assert "10" in summary # total_open
|
||||
|
||||
def test_contains_ready_count(self):
|
||||
result = self._make_result()
|
||||
summary = _build_daily_summary(result, [])
|
||||
assert "5" in summary
|
||||
|
||||
def test_actions_taken_section(self):
|
||||
decisions = [
|
||||
TriageDecision(1, "assign_claude", "P0 bug", agent="claude", executed=True),
|
||||
]
|
||||
result = self._make_result(decisions=decisions)
|
||||
summary = _build_daily_summary(result, [])
|
||||
assert "Actions Taken" in summary
|
||||
assert "#1" in summary
|
||||
|
||||
def test_top_issues_listed(self):
|
||||
scored = [_make_scored_issue(number=99, ready=True, score=8)]
|
||||
result = self._make_result()
|
||||
summary = _build_daily_summary(result, scored)
|
||||
assert "#99" in summary
|
||||
|
||||
def test_footer_present(self):
|
||||
summary = _build_daily_summary(self._make_result(), [])
|
||||
assert "Auto-generated" in summary
|
||||
|
||||
|
||||
# ── BacklogTriageLoop ─────────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestBacklogTriageLoop:
|
||||
def test_default_interval_from_settings(self):
|
||||
loop = BacklogTriageLoop()
|
||||
from config import settings
|
||||
|
||||
assert loop._interval == float(settings.backlog_triage_interval_seconds)
|
||||
|
||||
def test_custom_interval(self):
|
||||
loop = BacklogTriageLoop(interval=300)
|
||||
assert loop._interval == 300.0
|
||||
|
||||
def test_dry_run_default(self):
|
||||
loop = BacklogTriageLoop(dry_run=True)
|
||||
assert loop._dry_run is True
|
||||
|
||||
def test_not_running_initially(self):
|
||||
loop = BacklogTriageLoop()
|
||||
assert loop.is_running is False
|
||||
|
||||
def test_stop_sets_running_false(self):
|
||||
loop = BacklogTriageLoop()
|
||||
loop._running = True
|
||||
loop.stop()
|
||||
assert loop._running is False
|
||||
|
||||
def test_cycle_count_starts_zero(self):
|
||||
loop = BacklogTriageLoop()
|
||||
assert loop.cycle_count == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_skips_when_no_gitea_token(self):
|
||||
loop = BacklogTriageLoop()
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = ""
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = False
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
|
||||
with patch("timmy.backlog_triage.settings", mock_settings):
|
||||
result = await loop.run_once()
|
||||
|
||||
assert result.total_open == 0
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_dry_run_no_api_writes(self):
|
||||
"""In dry_run mode, decisions are made but no Gitea API writes happen."""
|
||||
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
|
||||
|
||||
raw_issues = [
|
||||
_make_raw_issue(
|
||||
number=10,
|
||||
title="Fix crash",
|
||||
labels=["bug"],
|
||||
body=(
|
||||
"## Problem\nCrash on login.\n## Solution\nFix src/auth.py "
|
||||
"def login()\nshould return 200\nmust pass tox tests"
|
||||
),
|
||||
)
|
||||
]
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "fake-token"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://gitea.local"
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = True
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.get.return_value = MagicMock(
|
||||
status_code=200, json=MagicMock(return_value=raw_issues)
|
||||
)
|
||||
|
||||
mock_ctx = AsyncMock()
|
||||
mock_ctx.__aenter__.return_value = mock_client
|
||||
mock_ctx.__aexit__.return_value = False
|
||||
|
||||
with (
|
||||
patch("timmy.backlog_triage.settings", mock_settings),
|
||||
patch("httpx.AsyncClient", return_value=mock_ctx),
|
||||
):
|
||||
result = await loop.run_once()
|
||||
|
||||
# No POST/PATCH calls in dry run
|
||||
mock_client.post.assert_not_called()
|
||||
mock_client.patch.assert_not_called()
|
||||
|
||||
assert result.total_open == 1
|
||||
assert loop.cycle_count == 1
|
||||
assert len(loop.history) == 1
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_assigns_unassigned_bug(self):
|
||||
"""Unassigned ready bug should be assigned to Claude with audit comment."""
|
||||
loop = BacklogTriageLoop(dry_run=False, daily_summary=False)
|
||||
|
||||
body = (
|
||||
"## Problem\nCrash on login.\n## Solution\nFix src/auth.py "
|
||||
"def login()\nshould return 200\nmust pass tox tests"
|
||||
)
|
||||
raw_issues = [_make_raw_issue(number=5, title="Fix crash", labels=["bug"], body=body)]
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "fake-token"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://gitea.local"
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = False
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
|
||||
# GET /issues returns our issue
|
||||
get_issues_resp = MagicMock(status_code=200)
|
||||
get_issues_resp.json.return_value = raw_issues
|
||||
|
||||
# POST /comments returns success
|
||||
comment_resp = MagicMock(status_code=201)
|
||||
comment_resp.json.return_value = {"id": 1}
|
||||
|
||||
# PATCH /issues/{n} (assign) returns success
|
||||
assign_resp = MagicMock(status_code=200)
|
||||
assign_resp.json.return_value = {"number": 5}
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.get.return_value = get_issues_resp
|
||||
mock_client.post.return_value = comment_resp
|
||||
mock_client.patch.return_value = assign_resp
|
||||
|
||||
mock_ctx = AsyncMock()
|
||||
mock_ctx.__aenter__.return_value = mock_client
|
||||
mock_ctx.__aexit__.return_value = False
|
||||
|
||||
with (
|
||||
patch("timmy.backlog_triage.settings", mock_settings),
|
||||
patch("httpx.AsyncClient", return_value=mock_ctx),
|
||||
patch("asyncio.sleep", new_callable=AsyncMock),
|
||||
):
|
||||
result = await loop.run_once()
|
||||
|
||||
assert result.total_open == 1
|
||||
# Comment should have been posted
|
||||
mock_client.post.assert_called()
|
||||
# Assign should have been called (PATCH)
|
||||
mock_client.patch.assert_called()
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_once_skips_already_assigned(self):
|
||||
"""Issues already assigned should not be acted upon."""
|
||||
loop = BacklogTriageLoop(dry_run=False, daily_summary=False)
|
||||
|
||||
raw_issues = [
|
||||
_make_raw_issue(
|
||||
number=3,
|
||||
labels=["bug"],
|
||||
assignees=["perplexity"],
|
||||
body="## Problem\nX\nmust pass tox\nshould return 200 at least 3 times",
|
||||
)
|
||||
]
|
||||
|
||||
mock_settings = MagicMock()
|
||||
mock_settings.gitea_enabled = True
|
||||
mock_settings.gitea_token = "tok"
|
||||
mock_settings.gitea_repo = "owner/repo"
|
||||
mock_settings.gitea_url = "http://gitea.local"
|
||||
mock_settings.backlog_triage_interval_seconds = 900
|
||||
mock_settings.backlog_triage_dry_run = False
|
||||
mock_settings.backlog_triage_daily_summary = False
|
||||
|
||||
get_resp = MagicMock(status_code=200)
|
||||
get_resp.json.return_value = raw_issues
|
||||
|
||||
mock_client = AsyncMock()
|
||||
mock_client.get.return_value = get_resp
|
||||
|
||||
mock_ctx = AsyncMock()
|
||||
mock_ctx.__aenter__.return_value = mock_client
|
||||
mock_ctx.__aexit__.return_value = False
|
||||
|
||||
with (
|
||||
patch("timmy.backlog_triage.settings", mock_settings),
|
||||
patch("httpx.AsyncClient", return_value=mock_ctx),
|
||||
):
|
||||
result = await loop.run_once()
|
||||
|
||||
# No writes for already-assigned issue
|
||||
mock_client.post.assert_not_called()
|
||||
mock_client.patch.assert_not_called()
|
||||
assert result.decisions[0].action == "skip"
|
||||
|
||||
|
||||
# ── ScoredIssue properties ────────────────────────────────────────────────────
|
||||
|
||||
|
||||
class TestScoredIssueProperties:
|
||||
def test_is_unassigned_true_when_no_assignees(self):
|
||||
issue = _make_scored_issue(assignees=[])
|
||||
assert issue.is_unassigned is True
|
||||
|
||||
def test_is_unassigned_false_when_assigned(self):
|
||||
issue = _make_scored_issue(assignees=["claude"])
|
||||
assert issue.is_unassigned is False
|
||||
|
||||
def test_needs_kimi_for_research_tag(self):
|
||||
issue = _make_scored_issue(tags={"research"})
|
||||
assert issue.needs_kimi is True
|
||||
|
||||
def test_needs_kimi_for_kimi_ready_label(self):
|
||||
issue = _make_scored_issue(labels=["kimi-ready"], tags=set())
|
||||
assert issue.needs_kimi is True
|
||||
|
||||
def test_needs_kimi_false_for_bug(self):
|
||||
issue = _make_scored_issue(tags={"bug"}, labels=[])
|
||||
assert issue.needs_kimi is False
|
||||
103
tests/unit/test_vassal_agent_health.py
Normal file
103
tests/unit/test_vassal_agent_health.py
Normal file
@@ -0,0 +1,103 @@
|
||||
"""Unit tests for timmy.vassal.agent_health."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AgentStatus
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_agent_status_idle_default():
|
||||
s = AgentStatus(agent="claude")
|
||||
assert s.is_idle is True
|
||||
assert s.is_stuck is False
|
||||
assert s.needs_reassignment is False
|
||||
|
||||
|
||||
def test_agent_status_active():
|
||||
s = AgentStatus(agent="kimi", active_issue_numbers=[10, 11])
|
||||
s.is_idle = len(s.active_issue_numbers) == 0
|
||||
assert s.is_idle is False
|
||||
|
||||
|
||||
def test_agent_status_stuck():
|
||||
s = AgentStatus(
|
||||
agent="claude",
|
||||
active_issue_numbers=[7],
|
||||
stuck_issue_numbers=[7],
|
||||
is_idle=False,
|
||||
)
|
||||
assert s.is_stuck is True
|
||||
assert s.needs_reassignment is True
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# AgentHealthReport
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_report_any_stuck():
|
||||
claude = AgentStatus(agent="claude", stuck_issue_numbers=[3])
|
||||
kimi = AgentStatus(agent="kimi")
|
||||
report = AgentHealthReport(agents=[claude, kimi])
|
||||
assert report.any_stuck is True
|
||||
|
||||
|
||||
def test_report_all_idle():
|
||||
report = AgentHealthReport(
|
||||
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
|
||||
)
|
||||
assert report.all_idle is True
|
||||
|
||||
|
||||
def test_report_for_agent_found():
|
||||
kimi = AgentStatus(agent="kimi", active_issue_numbers=[42])
|
||||
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi])
|
||||
found = report.for_agent("kimi")
|
||||
assert found is kimi
|
||||
|
||||
|
||||
def test_report_for_agent_not_found():
|
||||
report = AgentHealthReport(agents=[AgentStatus(agent="claude")])
|
||||
assert report.for_agent("timmy") is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# check_agent_health — no Gitea in unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_agent_health_unknown_agent():
|
||||
"""Unknown agent name returns idle status without error."""
|
||||
from timmy.vassal.agent_health import check_agent_health
|
||||
|
||||
status = await check_agent_health("unknown-bot")
|
||||
assert status.agent == "unknown-bot"
|
||||
assert status.is_idle is True
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_check_agent_health_no_token():
|
||||
"""Returns idle status gracefully when Gitea token is absent."""
|
||||
from timmy.vassal.agent_health import check_agent_health
|
||||
|
||||
status = await check_agent_health("claude")
|
||||
# Should not raise; returns idle (no active issues discovered)
|
||||
assert isinstance(status, AgentStatus)
|
||||
assert status.agent == "claude"
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_full_health_report_returns_both_agents():
|
||||
from timmy.vassal.agent_health import get_full_health_report
|
||||
|
||||
report = await get_full_health_report()
|
||||
agent_names = {a.agent for a in report.agents}
|
||||
assert "claude" in agent_names
|
||||
assert "kimi" in agent_names
|
||||
186
tests/unit/test_vassal_backlog.py
Normal file
186
tests/unit/test_vassal_backlog.py
Normal file
@@ -0,0 +1,186 @@
|
||||
"""Unit tests for timmy.vassal.backlog — triage and fetch helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.vassal.backlog import (
|
||||
AgentTarget,
|
||||
TriagedIssue,
|
||||
_choose_agent,
|
||||
_extract_labels,
|
||||
_score_priority,
|
||||
triage_issues,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _extract_labels
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_extract_labels_empty():
|
||||
assert _extract_labels({}) == []
|
||||
|
||||
|
||||
def test_extract_labels_normalises_case():
|
||||
issue = {"labels": [{"name": "HIGH"}, {"name": "Feature"}]}
|
||||
assert _extract_labels(issue) == ["high", "feature"]
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _score_priority
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_priority_urgent():
|
||||
assert _score_priority(["urgent"], []) == 100
|
||||
|
||||
|
||||
def test_priority_high():
|
||||
assert _score_priority(["high"], []) == 75
|
||||
|
||||
|
||||
def test_priority_normal_default():
|
||||
assert _score_priority([], []) == 50
|
||||
|
||||
|
||||
def test_priority_assigned_penalised():
|
||||
# already assigned → subtract 20
|
||||
score = _score_priority([], ["some-agent"])
|
||||
assert score == 30
|
||||
|
||||
|
||||
def test_priority_label_substring_match():
|
||||
# "critical" contains "critical" → 90
|
||||
assert _score_priority(["critical-bug"], []) == 90
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _choose_agent
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_choose_claude_for_architecture():
|
||||
target, rationale = _choose_agent("Refactor auth middleware", "", [])
|
||||
assert target == AgentTarget.CLAUDE
|
||||
assert "complex" in rationale or "high-complexity" in rationale
|
||||
|
||||
|
||||
def test_choose_kimi_for_research():
|
||||
target, rationale = _choose_agent("Deep research on embedding models", "", [])
|
||||
assert target == AgentTarget.KIMI
|
||||
|
||||
|
||||
def test_choose_timmy_for_docs():
|
||||
target, rationale = _choose_agent("Update documentation for CLI", "", [])
|
||||
assert target == AgentTarget.TIMMY
|
||||
|
||||
|
||||
def test_choose_timmy_default():
|
||||
target, rationale = _choose_agent("Fix typo in README", "simple change", [])
|
||||
# Could route to timmy (docs/trivial) or default — either is valid
|
||||
assert isinstance(target, AgentTarget)
|
||||
|
||||
|
||||
def test_choose_agent_label_wins():
|
||||
# "security" label → Claude
|
||||
target, _ = _choose_agent("Login page", "", ["security"])
|
||||
assert target == AgentTarget.CLAUDE
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# triage_issues
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def _make_raw_issue(
|
||||
number: int,
|
||||
title: str,
|
||||
body: str = "",
|
||||
labels: list[str] | None = None,
|
||||
assignees: list[str] | None = None,
|
||||
) -> dict:
|
||||
return {
|
||||
"number": number,
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [{"name": lbl} for lbl in (labels or [])],
|
||||
"assignees": [{"login": a} for a in (assignees or [])],
|
||||
"html_url": f"http://gitea/issues/{number}",
|
||||
}
|
||||
|
||||
|
||||
def test_triage_returns_sorted_by_priority():
|
||||
issues = [
|
||||
_make_raw_issue(1, "Routine docs update", labels=["docs"]),
|
||||
_make_raw_issue(2, "Critical security issue", labels=["urgent", "security"]),
|
||||
_make_raw_issue(3, "Normal feature", labels=[]),
|
||||
]
|
||||
triaged = triage_issues(issues)
|
||||
# Highest priority first
|
||||
assert triaged[0].number == 2
|
||||
assert triaged[0].priority_score == 100 # urgent label
|
||||
|
||||
|
||||
def test_triage_prs_can_be_included():
|
||||
# triage_issues does not filter PRs — that's fetch_open_issues's job
|
||||
issues = [_make_raw_issue(10, "A PR-like issue")]
|
||||
triaged = triage_issues(issues)
|
||||
assert len(triaged) == 1
|
||||
|
||||
|
||||
def test_triage_empty():
|
||||
assert triage_issues([]) == []
|
||||
|
||||
|
||||
def test_triage_routing():
|
||||
issues = [
|
||||
_make_raw_issue(1, "Benchmark LLM backends", body="comprehensive analysis"),
|
||||
_make_raw_issue(2, "Refactor agent loader", body="architecture change"),
|
||||
_make_raw_issue(3, "Fix typo in docs", labels=["docs"]),
|
||||
]
|
||||
triaged = {i.number: i for i in triage_issues(issues)}
|
||||
|
||||
assert triaged[1].agent_target == AgentTarget.KIMI
|
||||
assert triaged[2].agent_target == AgentTarget.CLAUDE
|
||||
assert triaged[3].agent_target == AgentTarget.TIMMY
|
||||
|
||||
|
||||
def test_triage_preserves_url():
|
||||
issues = [_make_raw_issue(42, "Some issue")]
|
||||
triaged = triage_issues(issues)
|
||||
assert triaged[0].url == "http://gitea/issues/42"
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# fetch_open_issues — no Gitea available in unit tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_fetch_open_issues_returns_empty_when_disabled(monkeypatch):
|
||||
"""When Gitea is disabled, fetch returns [] without raising."""
|
||||
import timmy.vassal.backlog as bl
|
||||
|
||||
# Patch settings
|
||||
class FakeSettings:
|
||||
gitea_enabled = False
|
||||
gitea_token = ""
|
||||
gitea_url = "http://localhost:3000"
|
||||
gitea_repo = "owner/repo"
|
||||
|
||||
monkeypatch.setattr(bl, "logger", bl.logger) # no-op just to confirm import
|
||||
|
||||
# We can't easily monkeypatch `from config import settings` inside the function,
|
||||
# so test the no-token path via environment
|
||||
import os
|
||||
|
||||
original = os.environ.pop("GITEA_TOKEN", None)
|
||||
try:
|
||||
result = await bl.fetch_open_issues()
|
||||
# Should return [] gracefully (no token configured by default in test env)
|
||||
assert isinstance(result, list)
|
||||
finally:
|
||||
if original is not None:
|
||||
os.environ["GITEA_TOKEN"] = original
|
||||
114
tests/unit/test_vassal_dispatch.py
Normal file
114
tests/unit/test_vassal_dispatch.py
Normal file
@@ -0,0 +1,114 @@
|
||||
"""Unit tests for timmy.vassal.dispatch — routing and label helpers."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.vassal.backlog import AgentTarget, TriagedIssue
|
||||
from timmy.vassal.dispatch import (
|
||||
DispatchRecord,
|
||||
clear_dispatch_registry,
|
||||
get_dispatch_registry,
|
||||
)
|
||||
|
||||
|
||||
def _make_triaged(
|
||||
number: int,
|
||||
title: str,
|
||||
agent: AgentTarget,
|
||||
priority: int = 50,
|
||||
) -> TriagedIssue:
|
||||
return TriagedIssue(
|
||||
number=number,
|
||||
title=title,
|
||||
body="",
|
||||
agent_target=agent,
|
||||
priority_score=priority,
|
||||
rationale="test rationale",
|
||||
url=f"http://gitea/issues/{number}",
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Registry helpers
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_registry_starts_empty():
|
||||
clear_dispatch_registry()
|
||||
assert get_dispatch_registry() == {}
|
||||
|
||||
|
||||
def test_registry_returns_copy():
|
||||
clear_dispatch_registry()
|
||||
reg = get_dispatch_registry()
|
||||
reg[999] = None # type: ignore[assignment]
|
||||
assert 999 not in get_dispatch_registry()
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# dispatch_issue — Timmy self-dispatch (no Gitea required)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_timmy_self_no_gitea():
|
||||
"""Timmy self-dispatch records without hitting Gitea."""
|
||||
clear_dispatch_registry()
|
||||
|
||||
issue = _make_triaged(1, "Fix docs typo", AgentTarget.TIMMY)
|
||||
from timmy.vassal.dispatch import dispatch_issue
|
||||
|
||||
record = await dispatch_issue(issue)
|
||||
|
||||
assert isinstance(record, DispatchRecord)
|
||||
assert record.issue_number == 1
|
||||
assert record.agent == AgentTarget.TIMMY
|
||||
assert 1 in get_dispatch_registry()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_claude_no_gitea_token():
|
||||
"""Claude dispatch gracefully degrades when Gitea token is absent."""
|
||||
clear_dispatch_registry()
|
||||
|
||||
issue = _make_triaged(2, "Refactor auth", AgentTarget.CLAUDE)
|
||||
from timmy.vassal.dispatch import dispatch_issue
|
||||
|
||||
record = await dispatch_issue(issue)
|
||||
|
||||
assert record.issue_number == 2
|
||||
assert record.agent == AgentTarget.CLAUDE
|
||||
# label/comment not applied — no token
|
||||
assert record.label_applied is False
|
||||
assert 2 in get_dispatch_registry()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_dispatch_kimi_no_gitea_token():
|
||||
clear_dispatch_registry()
|
||||
|
||||
issue = _make_triaged(3, "Research embeddings", AgentTarget.KIMI)
|
||||
from timmy.vassal.dispatch import dispatch_issue
|
||||
|
||||
record = await dispatch_issue(issue)
|
||||
|
||||
assert record.agent == AgentTarget.KIMI
|
||||
assert record.label_applied is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# DispatchRecord fields
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_dispatch_record_defaults():
|
||||
r = DispatchRecord(
|
||||
issue_number=5,
|
||||
issue_title="Test issue",
|
||||
agent=AgentTarget.TIMMY,
|
||||
rationale="because",
|
||||
)
|
||||
assert r.label_applied is False
|
||||
assert r.comment_posted is False
|
||||
assert r.dispatched_at # has a timestamp
|
||||
116
tests/unit/test_vassal_house_health.py
Normal file
116
tests/unit/test_vassal_house_health.py
Normal file
@@ -0,0 +1,116 @@
|
||||
"""Unit tests for timmy.vassal.house_health."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.vassal.house_health import (
|
||||
DiskUsage,
|
||||
MemoryUsage,
|
||||
OllamaHealth,
|
||||
SystemSnapshot,
|
||||
_probe_disk,
|
||||
)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Data model tests
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_system_snapshot_healthy_when_no_warnings():
|
||||
snap = SystemSnapshot()
|
||||
assert snap.healthy is True
|
||||
|
||||
|
||||
def test_system_snapshot_unhealthy_with_warnings():
|
||||
snap = SystemSnapshot(warnings=["disk 90% full"])
|
||||
assert snap.healthy is False
|
||||
|
||||
|
||||
def test_disk_usage_defaults():
|
||||
d = DiskUsage()
|
||||
assert d.percent_used == 0.0
|
||||
assert d.path == "/"
|
||||
|
||||
|
||||
def test_memory_usage_defaults():
|
||||
m = MemoryUsage()
|
||||
assert m.percent_used == 0.0
|
||||
|
||||
|
||||
def test_ollama_health_defaults():
|
||||
o = OllamaHealth()
|
||||
assert o.reachable is False
|
||||
assert o.loaded_models == []
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# _probe_disk — runs against real filesystem
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_probe_disk_root():
|
||||
result = _probe_disk("/")
|
||||
assert result.total_gb > 0
|
||||
assert 0.0 <= result.percent_used <= 100.0
|
||||
assert result.free_gb >= 0
|
||||
|
||||
|
||||
def test_probe_disk_bad_path():
|
||||
result = _probe_disk("/nonexistent_path_xyz")
|
||||
# Should not raise — returns zeroed DiskUsage
|
||||
assert result.percent_used == 0.0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# get_system_snapshot — async
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_system_snapshot_returns_snapshot():
|
||||
from timmy.vassal.house_health import get_system_snapshot
|
||||
|
||||
snap = await get_system_snapshot()
|
||||
assert isinstance(snap, SystemSnapshot)
|
||||
# Disk is always probed
|
||||
assert snap.disk.total_gb >= 0
|
||||
# Ollama is likely unreachable in test env — that's fine
|
||||
assert isinstance(snap.ollama, OllamaHealth)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_system_snapshot_disk_warning(monkeypatch):
|
||||
"""When disk is above threshold, a warning is generated."""
|
||||
import timmy.vassal.house_health as hh
|
||||
|
||||
# Patch _probe_disk to return high usage
|
||||
def _full_disk(path: str) -> DiskUsage:
|
||||
return DiskUsage(
|
||||
path=path,
|
||||
total_gb=100.0,
|
||||
used_gb=90.0,
|
||||
free_gb=10.0,
|
||||
percent_used=90.0,
|
||||
)
|
||||
|
||||
monkeypatch.setattr(hh, "_probe_disk", _full_disk)
|
||||
|
||||
snap = await hh.get_system_snapshot()
|
||||
assert any("disk" in w.lower() or "Disk" in w for w in snap.warnings)
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# cleanup_stale_files — temp dir test
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_cleanup_stale_files_missing_dir():
|
||||
"""Should not raise when the target dir doesn't exist."""
|
||||
from timmy.vassal.house_health import cleanup_stale_files
|
||||
|
||||
result = await cleanup_stale_files(temp_dirs=["/tmp/timmy_test_xyz_nonexistent"])
|
||||
assert result["deleted_count"] == 0
|
||||
assert result["errors"] == []
|
||||
139
tests/unit/test_vassal_orchestration_loop.py
Normal file
139
tests/unit/test_vassal_orchestration_loop.py
Normal file
@@ -0,0 +1,139 @@
|
||||
"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import pytest
|
||||
|
||||
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VassalCycleRecord
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_cycle_record_healthy_when_no_errors():
|
||||
r = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-01-01T00:00:00+00:00",
|
||||
)
|
||||
assert r.healthy is True
|
||||
|
||||
|
||||
def test_cycle_record_unhealthy_with_errors():
|
||||
r = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-01-01T00:00:00+00:00",
|
||||
errors=["backlog: connection refused"],
|
||||
)
|
||||
assert r.healthy is False
|
||||
|
||||
|
||||
def test_cycle_record_unhealthy_with_warnings():
|
||||
r = VassalCycleRecord(
|
||||
cycle_id=1,
|
||||
started_at="2026-01-01T00:00:00+00:00",
|
||||
house_warnings=["disk 90% full"],
|
||||
)
|
||||
assert r.healthy is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# VassalOrchestrator state
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_orchestrator_initial_state():
|
||||
orch = VassalOrchestrator()
|
||||
assert orch.cycle_count == 0
|
||||
assert orch.is_running is False
|
||||
assert orch.history == []
|
||||
|
||||
|
||||
def test_orchestrator_get_status_no_cycles():
|
||||
orch = VassalOrchestrator()
|
||||
status = orch.get_status()
|
||||
assert status["running"] is False
|
||||
assert status["cycle_count"] == 0
|
||||
assert status["last_cycle"] is None
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# run_cycle — integration (no Gitea, no Ollama in test env)
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_completes_without_services():
|
||||
"""run_cycle must complete and record even when external services are down."""
|
||||
from timmy.vassal.dispatch import clear_dispatch_registry
|
||||
|
||||
clear_dispatch_registry()
|
||||
orch = VassalOrchestrator(cycle_interval=300)
|
||||
|
||||
record = await orch.run_cycle()
|
||||
|
||||
assert isinstance(record, VassalCycleRecord)
|
||||
assert record.cycle_id == 1
|
||||
assert record.finished_at # was set
|
||||
assert record.duration_ms >= 0
|
||||
# No Gitea → fetched = 0, dispatched = 0
|
||||
assert record.issues_fetched == 0
|
||||
assert record.issues_dispatched == 0
|
||||
# History updated
|
||||
assert len(orch.history) == 1
|
||||
assert orch.cycle_count == 1
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_run_cycle_increments_cycle_count():
|
||||
from timmy.vassal.dispatch import clear_dispatch_registry
|
||||
|
||||
clear_dispatch_registry()
|
||||
orch = VassalOrchestrator()
|
||||
|
||||
await orch.run_cycle()
|
||||
await orch.run_cycle()
|
||||
|
||||
assert orch.cycle_count == 2
|
||||
assert len(orch.history) == 2
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_get_status_after_cycle():
|
||||
from timmy.vassal.dispatch import clear_dispatch_registry
|
||||
|
||||
clear_dispatch_registry()
|
||||
orch = VassalOrchestrator()
|
||||
|
||||
await orch.run_cycle()
|
||||
status = orch.get_status()
|
||||
|
||||
assert status["cycle_count"] == 1
|
||||
last = status["last_cycle"]
|
||||
assert last is not None
|
||||
assert last["cycle_id"] == 1
|
||||
assert last["issues_fetched"] == 0
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# start / stop
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_orchestrator_stop_when_not_running():
|
||||
"""stop() on an idle orchestrator must not raise."""
|
||||
orch = VassalOrchestrator()
|
||||
orch.stop() # should be a no-op
|
||||
assert orch.is_running is False
|
||||
|
||||
|
||||
# ---------------------------------------------------------------------------
|
||||
# Module-level singleton
|
||||
# ---------------------------------------------------------------------------
|
||||
|
||||
|
||||
def test_module_singleton_exists():
|
||||
from timmy.vassal import vassal_orchestrator, VassalOrchestrator
|
||||
|
||||
assert isinstance(vassal_orchestrator, VassalOrchestrator)
|
||||
21
tox.ini
21
tox.ini
@@ -47,12 +47,10 @@ commands =
|
||||
# ── Test Environments ────────────────────────────────────────────────────────
|
||||
|
||||
[testenv:unit]
|
||||
description = Fast tests — excludes e2e, functional, and external services
|
||||
description = Fast unit tests — only tests marked @pytest.mark.unit
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
--ignore=tests/e2e \
|
||||
--ignore=tests/functional \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
-m "unit and not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
|
||||
-n auto --dist worksteal
|
||||
|
||||
[testenv:integration]
|
||||
@@ -104,7 +102,7 @@ commands =
|
||||
--cov-report=xml:reports/coverage.xml \
|
||||
--cov-fail-under=73 \
|
||||
--junitxml=reports/junit.xml \
|
||||
-n auto --dist worksteal \
|
||||
-p no:xdist \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow"
|
||||
|
||||
[testenv:coverage]
|
||||
@@ -115,7 +113,7 @@ commands =
|
||||
--cov-report=term-missing \
|
||||
--cov-report=xml \
|
||||
--cov-fail-under=73 \
|
||||
-n auto --dist worksteal \
|
||||
-p no:xdist \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not slow"
|
||||
|
||||
[testenv:coverage-html]
|
||||
@@ -126,16 +124,7 @@ commands =
|
||||
--cov-report=term-missing \
|
||||
--cov-report=html \
|
||||
--cov-fail-under=73 \
|
||||
-n auto --dist worksteal \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not slow"
|
||||
|
||||
[testenv:coverage-parallel]
|
||||
description = Parallel coverage report
|
||||
commands =
|
||||
pytest tests/ -q --tb=short \
|
||||
--cov=src \
|
||||
--cov-report=term-missing \
|
||||
-n auto --dist worksteal \
|
||||
-p no:xdist \
|
||||
-m "not ollama and not docker and not selenium and not external_api and not slow"
|
||||
|
||||
# ── Pre-push (mirrors CI exactly) ────────────────────────────────────────────
|
||||
|
||||
Reference in New Issue
Block a user