Compare commits

..

2 Commits

Author SHA1 Message Date
Alexander Whitestone
2ffbf9cc96 fix: Enable xdist with coverage in tox.ini Fixes #932 2026-03-23 22:12:26 -04:00
Alexander Whitestone
f0bf778197 WIP: Gemini Code progress on #932
Automated salvage commit — agent session ended (exit 124).
Work in progress, may need continuation.
2026-03-23 14:34:27 -04:00
18 changed files with 37 additions and 4744 deletions

View File

@@ -304,16 +304,6 @@ class Settings(BaseSettings):
mcp_timeout: int = 15
mcp_bridge_timeout: int = 60 # HTTP timeout for MCP bridge Ollama calls (seconds)
# ── Backlog Triage Loop ────────────────────────────────────────────
# Autonomous loop: fetch open issues, score, assign to agents.
backlog_triage_enabled: bool = False
# Seconds between triage cycles (default: 15 minutes).
backlog_triage_interval_seconds: int = 900
# When True, score and summarize but don't write to Gitea.
backlog_triage_dry_run: bool = False
# Create a daily triage summary issue/comment.
backlog_triage_daily_summary: bool = True
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
# Self-test orchestrator that probes capabilities alongside the thinking loop.
loop_qa_enabled: bool = True
@@ -321,15 +311,6 @@ class Settings(BaseSettings):
loop_qa_upgrade_threshold: int = 3 # consecutive failures → file task
loop_qa_max_per_hour: int = 12 # safety throttle
# ── Vassal Protocol (Autonomous Orchestrator) ─────────────────────
# Timmy as lead decision-maker: triage backlog, dispatch agents, monitor health.
# See timmy/vassal/ for implementation.
vassal_enabled: bool = False # off by default — enable when Qwen3-14B is loaded
vassal_cycle_interval: int = 300 # seconds between orchestration cycles (5 min)
vassal_max_dispatch_per_cycle: int = 10 # cap on new dispatches per cycle
vassal_stuck_threshold_minutes: int = 120 # minutes before agent issue is "stuck"
vassal_idle_threshold_minutes: int = 30 # minutes before agent is "idle"
# ── Paperclip AI — orchestration bridge ────────────────────────────
# URL where the Paperclip server listens.
# For VPS deployment behind nginx, use the public domain.

View File

@@ -1,759 +0,0 @@
"""Autonomous backlog triage loop — Timmy scans Gitea and assigns work.
Continuously fetches open issues, scores/prioritizes them, and decides
what to work on next without waiting to be asked.
Loop flow::
while true:
1. Fetch all open issues from Gitea API
2. Score/prioritize by labels, age, type, blocked status
3. Identify unassigned high-priority items
4. Decide: assign to claude, dispatch to kimi, or flag for Alex
5. Execute the assignment (comment + assign)
6. Optionally post a daily triage summary
7. Sleep for configurable interval (default 15 min)
Priority tiers:
P0 — security, data loss, blocking bugs → immediate action
P1 — core functionality, ready issues → next sprint
P2 — improvements, low-score issues → backlog
P3 — philosophy, meta → someday/never (skip in triage)
Usage::
from timmy.backlog_triage import BacklogTriageLoop
loop = BacklogTriageLoop()
await loop.run_once() # single triage cycle
await loop.start() # background daemon loop
loop.stop() # graceful shutdown
"""
from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
# ── Constants ────────────────────────────────────────────────────────────────
# Minimum triage score to be considered "ready" for assignment
READY_THRESHOLD = 5
# Agent Gitea logins
AGENT_CLAUDE = "claude"
AGENT_KIMI = "kimi"
OWNER_LOGIN = "rockachopa" # Alex — human owner
# Labels
KIMI_READY_LABEL = "kimi-ready"
TRIAGE_DONE_LABEL = "triage-done"
# Tag sets (mirrors scripts/triage_score.py)
_BUG_TAGS = frozenset({"bug", "broken", "crash", "error", "fix", "regression", "hotfix"})
_FEATURE_TAGS = frozenset({"feature", "feat", "enhancement", "capability", "timmy-capability"})
_REFACTOR_TAGS = frozenset({"refactor", "cleanup", "tech-debt", "optimization", "perf"})
_META_TAGS = frozenset({"philosophy", "soul-gap", "discussion", "question", "rfc"})
_P0_TAGS = frozenset({"security", "data-loss", "blocking", "p0", "critical"})
_RESEARCH_TAGS = frozenset({"research", "kimi-ready", "investigation", "spike"})
_LOOP_TAG = "loop-generated"
# Regex patterns for scoring
_TAG_RE = re.compile(r"\[([^\]]+)\]")
_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE)
_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE)
_ACCEPT_RE = re.compile(
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
re.IGNORECASE,
)
_TEST_RE = re.compile(r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE)
_BLOCKED_RE = re.compile(r"\bblock(?:ed|s|ing)\b", re.IGNORECASE)
# ── Data types ───────────────────────────────────────────────────────────────
@dataclass
class ScoredIssue:
"""A Gitea issue enriched with triage scoring."""
number: int
title: str
body: str
labels: list[str]
tags: set[str]
assignees: list[str]
created_at: datetime
issue_type: str # bug | feature | refactor | philosophy | research | unknown
score: int = 0
scope: int = 0
acceptance: int = 0
alignment: int = 0
ready: bool = False
age_days: int = 0
is_p0: bool = False
is_blocked: bool = False
@property
def is_unassigned(self) -> bool:
return len(self.assignees) == 0
@property
def needs_kimi(self) -> bool:
return bool(self.tags & _RESEARCH_TAGS) or KIMI_READY_LABEL in self.labels
@dataclass
class TriageDecision:
"""The outcome of a triage decision for a single issue."""
issue_number: int
action: str # "assign_claude" | "assign_kimi" | "flag_alex" | "skip"
reason: str
agent: str = "" # the agent assigned (login)
executed: bool = False
error: str = ""
@dataclass
class TriageCycleResult:
"""Summary of one complete triage cycle."""
timestamp: str
total_open: int
scored: int
ready: int
decisions: list[TriageDecision] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
duration_ms: int = 0
# ── Scoring ──────────────────────────────────────────────────────────────────
def _extract_tags(title: str, labels: list[str]) -> set[str]:
"""Pull tags from [bracket] title notation + Gitea label names."""
tags: set[str] = set()
for m in _TAG_RE.finditer(title):
tags.add(m.group(1).lower().strip())
for lbl in labels:
tags.add(lbl.lower().strip())
return tags
def _score_scope(title: str, body: str, tags: set[str]) -> int:
"""03: How well-scoped is this issue?"""
text = f"{title}\n{body}"
score = 0
if _FILE_RE.search(text):
score += 1
if _FUNC_RE.search(text):
score += 1
clean = _TAG_RE.sub("", title).strip()
if len(clean) < 80:
score += 1
if tags & _META_TAGS:
score = max(0, score - 2)
return min(3, score)
def _score_acceptance(title: str, body: str, tags: set[str]) -> int:
"""03: Does this have clear acceptance criteria?"""
text = f"{title}\n{body}"
score = 0
matches = len(_ACCEPT_RE.findall(text))
if matches >= 3:
score += 2
elif matches >= 1:
score += 1
if _TEST_RE.search(text):
score += 1
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
score += 1
if tags & _META_TAGS:
score = max(0, score - 1)
return min(3, score)
def _score_alignment(title: str, body: str, tags: set[str]) -> int:
"""03: How aligned is this with the north star?"""
score = 0
if tags & _BUG_TAGS:
return 3
if tags & _REFACTOR_TAGS:
score += 2
if tags & _FEATURE_TAGS:
score += 2
if _LOOP_TAG in tags:
score += 1
if tags & _META_TAGS:
score = 0
return min(3, score)
def score_issue(issue: dict[str, Any]) -> ScoredIssue:
"""Score and classify a raw Gitea issue dict."""
number = issue["number"]
title = issue.get("title", "")
body = issue.get("body") or ""
label_names = [lbl["name"] for lbl in issue.get("labels", [])]
tags = _extract_tags(title, label_names)
assignees = [a["login"] for a in issue.get("assignees", [])]
# Parse created_at
raw_ts = issue.get("created_at", "")
try:
created_at = datetime.fromisoformat(raw_ts.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(UTC)
age_days = (datetime.now(UTC) - created_at).days
# Scores
scope = _score_scope(title, body, tags)
acceptance = _score_acceptance(title, body, tags)
alignment = _score_alignment(title, body, tags)
total = scope + acceptance + alignment
# Classify
if tags & _BUG_TAGS:
issue_type = "bug"
elif tags & _RESEARCH_TAGS:
issue_type = "research"
elif tags & _FEATURE_TAGS:
issue_type = "feature"
elif tags & _REFACTOR_TAGS:
issue_type = "refactor"
elif tags & _META_TAGS:
issue_type = "philosophy"
else:
issue_type = "unknown"
is_p0 = bool(tags & _P0_TAGS) or issue_type == "bug"
is_blocked = bool(_BLOCKED_RE.search(title) or _BLOCKED_RE.search(body))
return ScoredIssue(
number=number,
title=_TAG_RE.sub("", title).strip(),
body=body,
labels=label_names,
tags=tags,
assignees=assignees,
created_at=created_at,
issue_type=issue_type,
score=total,
scope=scope,
acceptance=acceptance,
alignment=alignment,
ready=total >= READY_THRESHOLD,
age_days=age_days,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ── Decision logic ───────────────────────────────────────────────────────────
def decide(issue: ScoredIssue) -> TriageDecision:
"""Decide what to do with an issue.
Returns a TriageDecision with action, reason, and agent.
Decision is not yet executed — call execute_decision() for that.
"""
num = issue.number
# Skip philosophy/meta — not dev-actionable
if issue.issue_type == "philosophy":
return TriageDecision(
issue_number=num,
action="skip",
reason="Philosophy/meta issue — not dev-actionable in the triage loop.",
)
# Skip already-assigned issues
if not issue.is_unassigned:
return TriageDecision(
issue_number=num,
action="skip",
reason=f"Already assigned to: {', '.join(issue.assignees)}.",
)
# Skip if not ready (low score)
if not issue.ready:
return TriageDecision(
issue_number=num,
action="skip",
reason=f"Score {issue.score} < {READY_THRESHOLD} threshold — needs more detail before assignment.",
)
# Blocked: flag for Alex
if issue.is_blocked:
return TriageDecision(
issue_number=num,
action="flag_alex",
agent=OWNER_LOGIN,
reason=(
"Issue appears blocked. Flagging for @rockachopa to unblock before autonomous assignment."
),
)
# Research / Kimi-ready
if issue.needs_kimi:
return TriageDecision(
issue_number=num,
action="assign_kimi",
agent=AGENT_KIMI,
reason=(
f"Issue type '{issue.issue_type}' with research/investigation scope. "
f"Assigning kimi-ready label for Kimi agent to pick up."
),
)
# P0 bugs and blocking issues → Claude immediately
if issue.is_p0:
return TriageDecision(
issue_number=num,
action="assign_claude",
agent=AGENT_CLAUDE,
reason=(
f"P0/{issue.issue_type} issue (score={issue.score}, age={issue.age_days}d). "
f"Assigning to Claude Code for immediate attention."
),
)
# Everything else that is ready → Claude Code
return TriageDecision(
issue_number=num,
action="assign_claude",
agent=AGENT_CLAUDE,
reason=(
f"Unassigned ready issue (type={issue.issue_type}, score={issue.score}, "
f"age={issue.age_days}d). Assigning to Claude Code."
),
)
# ── Gitea API client ─────────────────────────────────────────────────────────
def _api_headers() -> dict[str, str]:
return {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _repo_url(path: str) -> str:
owner, repo = settings.gitea_repo.split("/", 1)
return f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/{path}"
async def fetch_open_issues(client: httpx.AsyncClient) -> list[dict[str, Any]]:
"""Fetch all open issues from Gitea, paginating as needed."""
all_issues: list[dict[str, Any]] = []
page = 1
while True:
url = _repo_url(f"issues?state=open&type=issues&limit=50&page={page}")
try:
resp = await client.get(url, headers=_api_headers())
if resp.status_code != 200:
logger.warning("Gitea issues fetch failed (HTTP %s)", resp.status_code)
break
batch: list[dict[str, Any]] = resp.json()
if not batch:
break
all_issues.extend(batch)
if len(batch) < 50:
break
page += 1
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Gitea connection error fetching issues: %s", exc)
break
return all_issues
async def post_comment(
client: httpx.AsyncClient,
issue_number: int,
body: str,
) -> bool:
"""Post a comment on a Gitea issue. Returns True on success."""
url = _repo_url(f"issues/{issue_number}/comments")
try:
resp = await client.post(url, headers=_api_headers(), json={"body": body})
return resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to post comment on #%d: %s", issue_number, exc)
return False
async def assign_issue(
client: httpx.AsyncClient,
issue_number: int,
assignee: str,
) -> bool:
"""Assign an issue to a Gitea user. Returns True on success."""
url = _repo_url(f"issues/{issue_number}")
try:
resp = await client.patch(
url,
headers=_api_headers(),
json={"assignees": [assignee]},
)
return resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to assign #%d to %s: %s", issue_number, assignee, exc)
return False
async def add_label(
client: httpx.AsyncClient,
issue_number: int,
label_name: str,
) -> bool:
"""Add a label to a Gitea issue by name (auto-creates if missing). Returns True on success."""
owner, repo = settings.gitea_repo.split("/", 1)
labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels"
headers = _api_headers()
try:
# Fetch existing labels
resp = await client.get(labels_url, headers=headers)
if resp.status_code != 200:
return False
existing = {lbl["name"]: lbl["id"] for lbl in resp.json()}
if label_name in existing:
label_id = existing[label_name]
else:
# Auto-create the label
create_resp = await client.post(
labels_url,
headers=headers,
json={"name": label_name, "color": "#006b75"},
)
if create_resp.status_code not in (200, 201):
return False
label_id = create_resp.json()["id"]
# Apply to the issue
apply_url = _repo_url(f"issues/{issue_number}/labels")
apply_resp = await client.post(
apply_url, headers=headers, json={"labels": [label_id]}
)
return apply_resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to add label %r to #%d: %s", label_name, issue_number, exc)
return False
# ── Decision execution ───────────────────────────────────────────────────────
async def execute_decision(
client: httpx.AsyncClient,
decision: TriageDecision,
dry_run: bool = False,
) -> TriageDecision:
"""Execute a triage decision — comment + assign/label.
When dry_run=True, logs the decision but makes no Gitea API calls.
Returns the updated decision with executed=True on success.
"""
num = decision.issue_number
if decision.action == "skip":
logger.debug("Triage skip #%d: %s", num, decision.reason)
decision.executed = True
return decision
audit_comment = _build_audit_comment(decision)
if dry_run:
logger.info(
"[DRY RUN] #%d%s (%s): %s",
num,
decision.action,
decision.agent,
decision.reason,
)
decision.executed = True
return decision
# Post audit comment first (always, so Alex can see reasoning)
comment_ok = await post_comment(client, num, audit_comment)
if not comment_ok:
decision.error = "Failed to post audit comment"
logger.warning("Triage #%d: comment failed", num)
return decision
# Execute assignment
ok = False
if decision.action == "assign_claude":
ok = await assign_issue(client, num, AGENT_CLAUDE)
elif decision.action == "assign_kimi":
ok = await add_label(client, num, KIMI_READY_LABEL)
elif decision.action == "flag_alex":
# Comment already posted above — that's sufficient for flagging
ok = True
if ok:
decision.executed = True
logger.info("Triage #%d%s OK", num, decision.action)
else:
decision.error = f"Action {decision.action!r} failed"
logger.warning("Triage #%d: action %r failed", num, decision.action)
return decision
def _build_audit_comment(decision: TriageDecision) -> str:
"""Build the audit trail comment that Alex can read to see reasoning."""
ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
action_text = {
"assign_claude": f"Assigning to @{AGENT_CLAUDE} for implementation.",
"assign_kimi": f"Adding `{KIMI_READY_LABEL}` label — queuing for Kimi research agent.",
"flag_alex": f"Flagging for @{OWNER_LOGIN} — issue appears blocked or needs human decision.",
}.get(decision.action, decision.action)
return (
f"**[Timmy Triage — {ts}]**\n\n"
f"**Decision:** {action_text}\n\n"
f"**Why:** {decision.reason}\n\n"
f"*Autonomous triage by Timmy. Reply to override.*"
)
# ── Daily summary ─────────────────────────────────────────────────────────────
def _build_daily_summary(result: TriageCycleResult, scored: list[ScoredIssue]) -> str:
"""Build the daily triage summary body."""
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
assigned = [d for d in result.decisions if d.executed and d.action != "skip"]
skipped = [d for d in result.decisions if d.action == "skip"]
lines = [
f"# Timmy Backlog Triage — {now}",
"",
f"**Open issues:** {result.total_open} | "
f"**Scored:** {result.scored} | "
f"**Ready:** {result.ready} | "
f"**Assigned this cycle:** {len(assigned)}",
"",
"## Top 10 Ready Issues (by score)",
"",
]
top = sorted([s for s in scored if s.ready], key=lambda s: (-s.score, s.number))[:10]
for s in top:
flag = "🐛" if s.issue_type == "bug" else "" if s.is_p0 else ""
lines.append(
f"- {flag} **#{s.number}** (score={s.score}, age={s.age_days}d) — {s.title[:80]}"
)
if assigned:
lines += ["", "## Actions Taken", ""]
for d in assigned:
lines.append(f"- #{d.issue_number} → `{d.action}` ({d.agent}): {d.reason[:100]}")
if skipped:
lines += ["", f"## Skipped ({len(skipped)} issues)", ""]
for d in skipped[:5]:
lines.append(f"- #{d.issue_number}: {d.reason[:80]}")
if len(skipped) > 5:
lines.append(f"- … and {len(skipped) - 5} more")
lines += [
"",
"---",
"*Auto-generated by Timmy's backlog triage loop. "
"Override any decision by reassigning or commenting.*",
]
return "\n".join(lines)
async def post_daily_summary(
client: httpx.AsyncClient,
result: TriageCycleResult,
scored: list[ScoredIssue],
dry_run: bool = False,
) -> bool:
"""Post a daily triage summary as a new Gitea issue."""
today = datetime.now(UTC).strftime("%Y-%m-%d")
title = f"[Triage] Daily backlog summary — {today}"
body = _build_daily_summary(result, scored)
if dry_run:
logger.info("[DRY RUN] Would post daily summary: %s", title)
return True
url = _repo_url("issues")
try:
resp = await client.post(
url,
headers=_api_headers(),
json={
"title": title,
"body": body,
"labels": [],
},
)
if resp.status_code in (200, 201):
issue_num = resp.json().get("number", "?")
logger.info("Daily triage summary posted as issue #%s", issue_num)
return True
logger.warning("Daily summary post failed (HTTP %s)", resp.status_code)
return False
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to post daily summary: %s", exc)
return False
# ── Main loop class ───────────────────────────────────────────────────────────
class BacklogTriageLoop:
"""Autonomous backlog triage loop.
Fetches, scores, and assigns Gitea issues on a configurable interval.
Parameters
----------
interval:
Seconds between triage cycles. Default: settings.backlog_triage_interval_seconds.
dry_run:
When True, score and log decisions but don't write to Gitea.
daily_summary:
When True, post a daily triage summary issue after each cycle.
"""
def __init__(
self,
*,
interval: float | None = None,
dry_run: bool | None = None,
daily_summary: bool | None = None,
) -> None:
self._interval = float(interval or settings.backlog_triage_interval_seconds)
self._dry_run = dry_run if dry_run is not None else settings.backlog_triage_dry_run
self._daily_summary = (
daily_summary if daily_summary is not None else settings.backlog_triage_daily_summary
)
self._running = False
self._task: asyncio.Task | None = None
self._cycle_count = 0
self._last_summary_date: str = ""
self.history: list[TriageCycleResult] = []
@property
def is_running(self) -> bool:
return self._running
@property
def cycle_count(self) -> int:
return self._cycle_count
async def run_once(self) -> TriageCycleResult:
"""Execute one full triage cycle.
1. Fetch all open Gitea issues
2. Score and prioritize
3. Decide on each unassigned ready issue
4. Execute decisions
5. Optionally post daily summary
"""
import time
self._cycle_count += 1
start = time.monotonic()
ts = datetime.now(UTC).isoformat()
result = TriageCycleResult(timestamp=ts, total_open=0, scored=0, ready=0)
if not settings.gitea_enabled or not settings.gitea_token:
logger.warning("Backlog triage: Gitea not configured — skipping cycle")
return result
async with httpx.AsyncClient(timeout=30) as client:
# 1. Fetch
raw_issues = await fetch_open_issues(client)
result.total_open = len(raw_issues)
logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues))
# 2. Score
scored = [score_issue(i) for i in raw_issues]
result.scored = len(scored)
result.ready = sum(1 for s in scored if s.ready)
# 3 & 4. Decide and execute for each issue
for issue in scored:
decision = decide(issue)
if decision.action == "skip":
result.decisions.append(decision)
continue
decision = await execute_decision(client, decision, dry_run=self._dry_run)
result.decisions.append(decision)
# Rate-limit: short pause between API writes to avoid hammering Gitea
if not self._dry_run:
await asyncio.sleep(0.5)
# 5. Daily summary (once per UTC day)
today = datetime.now(UTC).strftime("%Y-%m-%d")
if self._daily_summary and today != self._last_summary_date:
await post_daily_summary(client, result, scored, dry_run=self._dry_run)
self._last_summary_date = today
result.duration_ms = int((time.monotonic() - start) * 1000)
self.history.append(result)
assigned_count = sum(1 for d in result.decisions if d.executed and d.action != "skip")
logger.info(
"Triage cycle #%d complete (%d ms): %d open, %d ready, %d assigned",
self._cycle_count,
result.duration_ms,
result.total_open,
result.ready,
assigned_count,
)
return result
async def start(self) -> None:
"""Start the triage loop as a background task."""
if self._running:
logger.warning("BacklogTriageLoop already running")
return
self._running = True
await self._loop()
async def _loop(self) -> None:
logger.info(
"BacklogTriageLoop started (interval=%.0fs, dry_run=%s)",
self._interval,
self._dry_run,
)
while self._running:
try:
await self.run_once()
except Exception:
logger.exception("Backlog triage cycle failed")
await asyncio.sleep(self._interval)
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
logger.info("BacklogTriageLoop stop requested")

View File

@@ -1,801 +0,0 @@
"""Agent dispatcher — route tasks to Claude Code, Kimi, APIs, or Timmy itself.
Timmy's dispatch system: knows what agents are available, what they're good
at, and how to send them work. Uses Gitea labels and issue comments to assign
tasks and track completion.
Dispatch flow:
1. Match task type to agent strengths
2. Check agent availability (idle or working?)
3. Dispatch task with full context (issue link, requirements, criteria)
4. Log assignment as a Gitea comment
5. Monitor for completion or timeout
6. Review output quality
7. If output fails QA → reassign or escalate
Agent interfaces:
- Claude Code → ``claude-ready`` Gitea label + issue comment
- Kimi Code → ``kimi-ready`` Gitea label + issue comment
- Agent APIs → HTTP POST to external endpoint
- Timmy (self) → direct local invocation
Usage::
from timmy.dispatcher import dispatch_task, TaskType, AgentType
result = await dispatch_task(
issue_number=1072,
task_type=TaskType.ARCHITECTURE,
title="Design the LLM router",
description="We need a cascade router...",
acceptance_criteria=["Failover works", "Metrics exposed"],
)
"""
from __future__ import annotations
import asyncio
import logging
from dataclasses import dataclass, field
from enum import Enum
from typing import Any
from config import settings
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Enumerations
# ---------------------------------------------------------------------------
class AgentType(str, Enum):
"""Known agents in the swarm."""
CLAUDE_CODE = "claude_code"
KIMI_CODE = "kimi_code"
AGENT_API = "agent_api"
TIMMY = "timmy"
class TaskType(str, Enum):
"""Categories of engineering work."""
# Claude Code strengths
ARCHITECTURE = "architecture"
REFACTORING = "refactoring"
COMPLEX_REASONING = "complex_reasoning"
CODE_REVIEW = "code_review"
# Kimi Code strengths
PARALLEL_IMPLEMENTATION = "parallel_implementation"
ROUTINE_CODING = "routine_coding"
FAST_ITERATION = "fast_iteration"
# Agent API strengths
RESEARCH = "research"
ANALYSIS = "analysis"
SPECIALIZED = "specialized"
# Timmy strengths
TRIAGE = "triage"
PLANNING = "planning"
CREATIVE = "creative"
ORCHESTRATION = "orchestration"
class DispatchStatus(str, Enum):
"""Lifecycle state of a dispatched task."""
PENDING = "pending"
ASSIGNED = "assigned"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
ESCALATED = "escalated"
TIMED_OUT = "timed_out"
# ---------------------------------------------------------------------------
# Agent registry
# ---------------------------------------------------------------------------
@dataclass
class AgentSpec:
"""Capabilities and limits for a single agent."""
name: AgentType
display_name: str
strengths: frozenset[TaskType]
gitea_label: str | None # label to apply when dispatching
max_concurrent: int = 1
interface: str = "gitea" # "gitea" | "api" | "local"
api_endpoint: str | None = None # for interface="api"
#: Authoritative agent registry — all known agents and their capabilities.
AGENT_REGISTRY: dict[AgentType, AgentSpec] = {
AgentType.CLAUDE_CODE: AgentSpec(
name=AgentType.CLAUDE_CODE,
display_name="Claude Code",
strengths=frozenset(
{
TaskType.ARCHITECTURE,
TaskType.REFACTORING,
TaskType.COMPLEX_REASONING,
TaskType.CODE_REVIEW,
}
),
gitea_label="claude-ready",
max_concurrent=1,
interface="gitea",
),
AgentType.KIMI_CODE: AgentSpec(
name=AgentType.KIMI_CODE,
display_name="Kimi Code",
strengths=frozenset(
{
TaskType.PARALLEL_IMPLEMENTATION,
TaskType.ROUTINE_CODING,
TaskType.FAST_ITERATION,
}
),
gitea_label="kimi-ready",
max_concurrent=1,
interface="gitea",
),
AgentType.AGENT_API: AgentSpec(
name=AgentType.AGENT_API,
display_name="Agent API",
strengths=frozenset(
{
TaskType.RESEARCH,
TaskType.ANALYSIS,
TaskType.SPECIALIZED,
}
),
gitea_label=None,
max_concurrent=5,
interface="api",
),
AgentType.TIMMY: AgentSpec(
name=AgentType.TIMMY,
display_name="Timmy",
strengths=frozenset(
{
TaskType.TRIAGE,
TaskType.PLANNING,
TaskType.CREATIVE,
TaskType.ORCHESTRATION,
}
),
gitea_label=None,
max_concurrent=1,
interface="local",
),
}
#: Map from task type to preferred agent (primary routing table).
_TASK_ROUTING: dict[TaskType, AgentType] = {
TaskType.ARCHITECTURE: AgentType.CLAUDE_CODE,
TaskType.REFACTORING: AgentType.CLAUDE_CODE,
TaskType.COMPLEX_REASONING: AgentType.CLAUDE_CODE,
TaskType.CODE_REVIEW: AgentType.CLAUDE_CODE,
TaskType.PARALLEL_IMPLEMENTATION: AgentType.KIMI_CODE,
TaskType.ROUTINE_CODING: AgentType.KIMI_CODE,
TaskType.FAST_ITERATION: AgentType.KIMI_CODE,
TaskType.RESEARCH: AgentType.AGENT_API,
TaskType.ANALYSIS: AgentType.AGENT_API,
TaskType.SPECIALIZED: AgentType.AGENT_API,
TaskType.TRIAGE: AgentType.TIMMY,
TaskType.PLANNING: AgentType.TIMMY,
TaskType.CREATIVE: AgentType.TIMMY,
TaskType.ORCHESTRATION: AgentType.TIMMY,
}
# ---------------------------------------------------------------------------
# Dispatch result
# ---------------------------------------------------------------------------
@dataclass
class DispatchResult:
"""Outcome of a dispatch call."""
task_type: TaskType
agent: AgentType
issue_number: int | None
status: DispatchStatus
comment_id: int | None = None
label_applied: str | None = None
error: str | None = None
retry_count: int = 0
metadata: dict[str, Any] = field(default_factory=dict)
@property
def success(self) -> bool: # noqa: D401
return self.status in (DispatchStatus.ASSIGNED, DispatchStatus.COMPLETED)
# ---------------------------------------------------------------------------
# Routing logic
# ---------------------------------------------------------------------------
def select_agent(task_type: TaskType) -> AgentType:
"""Return the best agent for *task_type* based on the routing table.
Args:
task_type: The category of engineering work to be done.
Returns:
The :class:`AgentType` best suited to handle this task.
"""
return _TASK_ROUTING.get(task_type, AgentType.TIMMY)
def infer_task_type(title: str, description: str = "") -> TaskType:
"""Heuristic: guess the most appropriate :class:`TaskType` from text.
Scans *title* and *description* for keyword signals and returns the
strongest match. Falls back to :attr:`TaskType.ROUTINE_CODING`.
Args:
title: Short task title.
description: Longer task description (optional).
Returns:
The inferred :class:`TaskType`.
"""
text = (title + " " + description).lower()
_SIGNALS: list[tuple[TaskType, frozenset[str]]] = [
(TaskType.ARCHITECTURE, frozenset({"architect", "design", "adr", "system design", "schema"})),
(TaskType.REFACTORING, frozenset({"refactor", "clean up", "cleanup", "reorganise", "reorganize"})),
(TaskType.CODE_REVIEW, frozenset({"review", "pr review", "pull request review", "audit"})),
(TaskType.COMPLEX_REASONING, frozenset({"complex", "hard problem", "debug", "investigate", "diagnose"})),
(TaskType.RESEARCH, frozenset({"research", "survey", "literature", "benchmark", "analyse", "analyze"})),
(TaskType.ANALYSIS, frozenset({"analysis", "profil", "trace", "metric", "performance"})),
(TaskType.TRIAGE, frozenset({"triage", "classify", "prioritise", "prioritize"})),
(TaskType.PLANNING, frozenset({"plan", "roadmap", "milestone", "epic", "spike"})),
(TaskType.CREATIVE, frozenset({"creative", "persona", "story", "write", "draft"})),
(TaskType.ORCHESTRATION, frozenset({"orchestrat", "coordinat", "swarm", "dispatch"})),
(TaskType.PARALLEL_IMPLEMENTATION, frozenset({"parallel", "concurrent", "batch"})),
(TaskType.FAST_ITERATION, frozenset({"quick", "fast", "iterate", "prototype", "poc"})),
]
for task_type, keywords in _SIGNALS:
if any(kw in text for kw in keywords):
return task_type
return TaskType.ROUTINE_CODING
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
async def _post_gitea_comment(
client: Any,
base_url: str,
repo: str,
headers: dict[str, str],
issue_number: int,
body: str,
) -> int | None:
"""Post a comment on a Gitea issue and return the comment ID."""
try:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
logger.warning(
"Comment on #%s returned %s: %s",
issue_number,
resp.status_code,
resp.text[:200],
)
except Exception as exc:
logger.warning("Failed to post comment on #%s: %s", issue_number, exc)
return None
async def _apply_gitea_label(
client: Any,
base_url: str,
repo: str,
headers: dict[str, str],
issue_number: int,
label_name: str,
label_color: str = "#0075ca",
) -> bool:
"""Ensure *label_name* exists and apply it to an issue.
Returns True if the label was successfully applied.
"""
# Resolve or create the label
label_id: int | None = None
try:
resp = await client.get(f"{base_url}/repos/{repo}/labels", headers=headers)
if resp.status_code == 200:
for lbl in resp.json():
if lbl.get("name") == label_name:
label_id = lbl["id"]
break
except Exception as exc:
logger.warning("Failed to list labels: %s", exc)
return False
if label_id is None:
try:
resp = await client.post(
f"{base_url}/repos/{repo}/labels",
headers=headers,
json={"name": label_name, "color": label_color},
)
if resp.status_code in (200, 201):
label_id = resp.json().get("id")
except Exception as exc:
logger.warning("Failed to create label %r: %s", label_name, exc)
return False
if label_id is None:
return False
# Apply label to the issue
try:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/labels",
headers=headers,
json={"labels": [label_id]},
)
return resp.status_code in (200, 201)
except Exception as exc:
logger.warning("Failed to apply label %r to #%s: %s", label_name, issue_number, exc)
return False
async def _poll_issue_completion(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 7200,
) -> DispatchStatus:
"""Poll a Gitea issue until closed (completed) or timeout.
Args:
issue_number: Gitea issue to watch.
poll_interval: Seconds between polls.
max_wait: Maximum total seconds to wait.
Returns:
:attr:`DispatchStatus.COMPLETED` if the issue was closed,
:attr:`DispatchStatus.TIMED_OUT` otherwise.
"""
try:
import httpx
except ImportError as exc:
logger.warning("poll_issue_completion: missing dependency: %s", exc)
return DispatchStatus.FAILED
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
issue_url = f"{base_url}/repos/{repo}/issues/{issue_number}"
elapsed = 0
while elapsed < max_wait:
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.get(issue_url, headers=headers)
if resp.status_code == 200 and resp.json().get("state") == "closed":
logger.info("Issue #%s closed — task completed", issue_number)
return DispatchStatus.COMPLETED
except Exception as exc:
logger.warning("Poll error for issue #%s: %s", issue_number, exc)
await asyncio.sleep(poll_interval)
elapsed += poll_interval
logger.warning("Timed out waiting for issue #%s after %ss", issue_number, max_wait)
return DispatchStatus.TIMED_OUT
# ---------------------------------------------------------------------------
# Core dispatch functions
# ---------------------------------------------------------------------------
async def _dispatch_via_gitea(
agent: AgentType,
issue_number: int,
title: str,
description: str,
acceptance_criteria: list[str],
) -> DispatchResult:
"""Assign a task by applying a Gitea label and posting an assignment comment.
Args:
agent: Target agent.
issue_number: Gitea issue to assign.
title: Short task title.
description: Full task description.
acceptance_criteria: List of acceptance criteria strings.
Returns:
:class:`DispatchResult` describing the outcome.
"""
try:
import httpx
except ImportError as exc:
return DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=f"Missing dependency: {exc}",
)
spec = AGENT_REGISTRY[agent]
task_type = infer_task_type(title, description)
if not settings.gitea_enabled or not settings.gitea_token:
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error="Gitea integration not configured (no token or disabled).",
)
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
comment_id: int | None = None
label_applied: str | None = None
async with httpx.AsyncClient(timeout=15) as client:
# 1. Apply agent label (if applicable)
if spec.gitea_label:
ok = await _apply_gitea_label(
client, base_url, repo, headers, issue_number, spec.gitea_label
)
if ok:
label_applied = spec.gitea_label
logger.info(
"Applied label %r to issue #%s for %s",
spec.gitea_label,
issue_number,
spec.display_name,
)
else:
logger.warning(
"Could not apply label %r to issue #%s",
spec.gitea_label,
issue_number,
)
# 2. Post assignment comment
criteria_md = "\n".join(f"- {c}" for c in acceptance_criteria) if acceptance_criteria else "_None specified_"
comment_body = (
f"## Assigned to {spec.display_name}\n\n"
f"**Task type:** `{task_type.value}`\n\n"
f"**Description:**\n{description}\n\n"
f"**Acceptance criteria:**\n{criteria_md}\n\n"
f"---\n*Dispatched by Timmy agent dispatcher.*"
)
comment_id = await _post_gitea_comment(
client, base_url, repo, headers, issue_number, comment_body
)
if comment_id is not None or label_applied is not None:
logger.info(
"Dispatched issue #%s to %s (label=%r, comment=%s)",
issue_number,
spec.display_name,
label_applied,
comment_id,
)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.ASSIGNED,
comment_id=comment_id,
label_applied=label_applied,
)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error="Failed to apply label and post comment — check Gitea connectivity.",
)
async def _dispatch_via_api(
agent: AgentType,
title: str,
description: str,
acceptance_criteria: list[str],
issue_number: int | None = None,
endpoint: str | None = None,
) -> DispatchResult:
"""Dispatch a task to an external HTTP API agent.
Args:
agent: Target agent.
title: Short task title.
description: Task description.
acceptance_criteria: List of acceptance criteria.
issue_number: Optional Gitea issue for cross-referencing.
endpoint: Override API endpoint URL (uses spec default if omitted).
Returns:
:class:`DispatchResult` describing the outcome.
"""
spec = AGENT_REGISTRY[agent]
task_type = infer_task_type(title, description)
url = endpoint or spec.api_endpoint
if not url:
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=f"No API endpoint configured for agent {agent.value}.",
)
payload = {
"title": title,
"description": description,
"acceptance_criteria": acceptance_criteria,
"issue_number": issue_number,
"agent": agent.value,
"task_type": task_type.value,
}
try:
import httpx
async with httpx.AsyncClient(timeout=30) as client:
resp = await client.post(url, json=payload)
if resp.status_code in (200, 201, 202):
logger.info("Dispatched %r to API agent %s at %s", title[:60], agent.value, url)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.ASSIGNED,
metadata={"response": resp.json() if resp.content else {}},
)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=f"API agent returned {resp.status_code}: {resp.text[:200]}",
)
except Exception as exc:
logger.warning("API dispatch to %s failed: %s", url, exc)
return DispatchResult(
task_type=task_type,
agent=agent,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error=str(exc),
)
async def _dispatch_local(
title: str,
description: str = "",
acceptance_criteria: list[str] | None = None,
issue_number: int | None = None,
) -> DispatchResult:
"""Handle a task locally — Timmy processes it directly.
This is a lightweight stub. Real local execution should be wired
into the agentic loop or a dedicated Timmy tool.
Args:
title: Short task title.
description: Task description.
acceptance_criteria: Acceptance criteria list.
issue_number: Optional Gitea issue number for logging.
Returns:
:class:`DispatchResult` with ASSIGNED status (local execution is
assumed to succeed at dispatch time).
"""
task_type = infer_task_type(title, description)
logger.info(
"Timmy handling task locally: %r (issue #%s)", title[:60], issue_number
)
return DispatchResult(
task_type=task_type,
agent=AgentType.TIMMY,
issue_number=issue_number,
status=DispatchStatus.ASSIGNED,
metadata={"local": True, "description": description},
)
# ---------------------------------------------------------------------------
# Public entry point
# ---------------------------------------------------------------------------
async def dispatch_task(
title: str,
description: str = "",
acceptance_criteria: list[str] | None = None,
task_type: TaskType | None = None,
agent: AgentType | None = None,
issue_number: int | None = None,
api_endpoint: str | None = None,
max_retries: int = 1,
) -> DispatchResult:
"""Route a task to the best available agent.
This is the primary entry point. Callers can either specify the
*agent* and *task_type* explicitly or let the dispatcher infer them
from the *title* and *description*.
Args:
title: Short human-readable task title.
description: Full task description with context.
acceptance_criteria: List of acceptance criteria strings.
task_type: Override automatic task type inference.
agent: Override automatic agent selection.
issue_number: Gitea issue number to log the assignment on.
api_endpoint: Override API endpoint for AGENT_API dispatches.
max_retries: Number of retry attempts on failure (default 1).
Returns:
:class:`DispatchResult` describing the final dispatch outcome.
Example::
result = await dispatch_task(
issue_number=1072,
title="Build the cascade LLM router",
description="We need automatic failover...",
acceptance_criteria=["Circuit breaker works", "Metrics exposed"],
)
if result.success:
print(f"Assigned to {result.agent.value}")
"""
criteria = acceptance_criteria or []
if not title.strip():
return DispatchResult(
task_type=task_type or TaskType.ROUTINE_CODING,
agent=agent or AgentType.TIMMY,
issue_number=issue_number,
status=DispatchStatus.FAILED,
error="`title` is required.",
)
resolved_type = task_type or infer_task_type(title, description)
resolved_agent = agent or select_agent(resolved_type)
logger.info(
"Dispatching task %r%s (type=%s, issue=#%s)",
title[:60],
resolved_agent.value,
resolved_type.value,
issue_number,
)
spec = AGENT_REGISTRY[resolved_agent]
last_result: DispatchResult | None = None
for attempt in range(max_retries + 1):
if attempt > 0:
logger.info("Retry %d/%d for task %r", attempt, max_retries, title[:60])
if spec.interface == "gitea" and issue_number is not None:
result = await _dispatch_via_gitea(
resolved_agent, issue_number, title, description, criteria
)
elif spec.interface == "api":
result = await _dispatch_via_api(
resolved_agent, title, description, criteria, issue_number, api_endpoint
)
else:
result = await _dispatch_local(title, description, criteria, issue_number)
result.retry_count = attempt
last_result = result
if result.success:
return result
logger.warning(
"Dispatch attempt %d failed for task %r: %s",
attempt + 1,
title[:60],
result.error,
)
# All attempts exhausted — escalate
assert last_result is not None
last_result.status = DispatchStatus.ESCALATED
logger.error(
"Task %r escalated after %d failed attempt(s): %s",
title[:60],
max_retries + 1,
last_result.error,
)
# Try to log the escalation on the issue
if issue_number is not None:
await _log_escalation(issue_number, resolved_agent, last_result.error or "unknown error")
return last_result
async def _log_escalation(
issue_number: int,
agent: AgentType,
error: str,
) -> None:
"""Post an escalation notice on the Gitea issue."""
try:
import httpx
if not settings.gitea_enabled or not settings.gitea_token:
return
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"## Dispatch Escalated\n\n"
f"Could not assign to **{AGENT_REGISTRY[agent].display_name}** "
f"after {1} attempt(s).\n\n"
f"**Error:** {error}\n\n"
f"Manual intervention required.\n\n"
f"---\n*Timmy agent dispatcher.*"
)
async with httpx.AsyncClient(timeout=10) as client:
await _post_gitea_comment(
client, base_url, repo, headers, issue_number, body
)
except Exception as exc:
logger.warning("Failed to post escalation comment: %s", exc)
# ---------------------------------------------------------------------------
# Monitoring helper
# ---------------------------------------------------------------------------
async def wait_for_completion(
issue_number: int,
poll_interval: int = 60,
max_wait: int = 7200,
) -> DispatchStatus:
"""Block until the assigned Gitea issue is closed or the timeout fires.
Useful for synchronous orchestration where the caller wants to wait for
the assigned agent to finish before proceeding.
Args:
issue_number: Gitea issue to monitor.
poll_interval: Seconds between status polls.
max_wait: Maximum wait in seconds (default 2 hours).
Returns:
:attr:`DispatchStatus.COMPLETED` or :attr:`DispatchStatus.TIMED_OUT`.
"""
return await _poll_issue_completion(issue_number, poll_interval, max_wait)

View File

@@ -462,8 +462,7 @@ def consult_grok(query: str) -> str:
inv = ln.create_invoice(sats, f"Grok query: {query[:_INVOICE_MEMO_MAX_LEN]}")
invoice_info = f"\n[Lightning invoice: {sats} sats — {inv.payment_request[:40]}...]"
except (ImportError, OSError, ValueError) as exc:
logger.error("Lightning invoice creation failed: %s", exc)
return "Error: Failed to create Lightning invoice. Please check logs."
logger.warning("Tool execution failed (Lightning invoice): %s", exc)
result = backend.run(query)
@@ -534,8 +533,7 @@ def _register_web_fetch_tool(toolkit: Toolkit) -> None:
try:
toolkit.register(web_fetch, name="web_fetch")
except Exception as exc:
logger.error("Failed to register web_fetch tool: %s", exc)
raise
logger.warning("Tool execution failed (web_fetch registration): %s", exc)
def _register_core_tools(toolkit: Toolkit, base_path: Path) -> None:
@@ -567,8 +565,8 @@ def _register_grok_tool(toolkit: Toolkit) -> None:
toolkit.register(consult_grok, name="consult_grok")
logger.info("Grok consultation tool registered")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Grok tool: %s", exc)
raise
logger.warning("Tool execution failed (Grok registration): %s", exc)
logger.debug("Grok tool not available")
def _register_memory_tools(toolkit: Toolkit) -> None:
@@ -581,8 +579,8 @@ def _register_memory_tools(toolkit: Toolkit) -> None:
toolkit.register(memory_read, name="memory_read")
toolkit.register(memory_forget, name="memory_forget")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Memory tools: %s", exc)
raise
logger.warning("Tool execution failed (Memory tools registration): %s", exc)
logger.debug("Memory tools not available")
def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
@@ -630,8 +628,8 @@ def _register_agentic_loop_tool(toolkit: Toolkit) -> None:
toolkit.register(plan_and_execute, name="plan_and_execute")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register plan_and_execute tool: %s", exc)
raise
logger.warning("Tool execution failed (plan_and_execute registration): %s", exc)
logger.debug("plan_and_execute tool not available")
def _register_introspection_tools(toolkit: Toolkit) -> None:
@@ -649,16 +647,15 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
toolkit.register(get_memory_status, name="get_memory_status")
toolkit.register(run_self_tests, name="run_self_tests")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Introspection tools: %s", exc)
raise
logger.warning("Tool execution failed (Introspection tools registration): %s", exc)
logger.debug("Introspection tools not available")
try:
from timmy.mcp_tools import update_gitea_avatar
toolkit.register(update_gitea_avatar, name="update_gitea_avatar")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register update_gitea_avatar tool: %s", exc)
raise
logger.debug("update_gitea_avatar tool not available: %s", exc)
try:
from timmy.session_logger import self_reflect, session_history
@@ -666,8 +663,8 @@ def _register_introspection_tools(toolkit: Toolkit) -> None:
toolkit.register(session_history, name="session_history")
toolkit.register(self_reflect, name="self_reflect")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register session_history tool: %s", exc)
raise
logger.warning("Tool execution failed (session_history registration): %s", exc)
logger.debug("session_history tool not available")
def _register_delegation_tools(toolkit: Toolkit) -> None:
@@ -679,8 +676,8 @@ def _register_delegation_tools(toolkit: Toolkit) -> None:
toolkit.register(delegate_to_kimi, name="delegate_to_kimi")
toolkit.register(list_swarm_agents, name="list_swarm_agents")
except Exception as exc:
logger.error("Failed to register Delegation tools: %s", exc)
raise
logger.warning("Tool execution failed (Delegation tools registration): %s", exc)
logger.debug("Delegation tools not available")
def _register_gematria_tool(toolkit: Toolkit) -> None:
@@ -690,8 +687,8 @@ def _register_gematria_tool(toolkit: Toolkit) -> None:
toolkit.register(gematria, name="gematria")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Gematria tool: %s", exc)
raise
logger.warning("Tool execution failed (Gematria registration): %s", exc)
logger.debug("Gematria tool not available")
def _register_artifact_tools(toolkit: Toolkit) -> None:
@@ -702,8 +699,8 @@ def _register_artifact_tools(toolkit: Toolkit) -> None:
toolkit.register(jot_note, name="jot_note")
toolkit.register(log_decision, name="log_decision")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Artifact tools: %s", exc)
raise
logger.warning("Tool execution failed (Artifact tools registration): %s", exc)
logger.debug("Artifact tools not available")
def _register_thinking_tools(toolkit: Toolkit) -> None:
@@ -713,8 +710,8 @@ def _register_thinking_tools(toolkit: Toolkit) -> None:
toolkit.register(search_thoughts, name="thought_search")
except (ImportError, AttributeError) as exc:
logger.error("Failed to register Thinking tools: %s", exc)
raise
logger.warning("Tool execution failed (Thinking tools registration): %s", exc)
logger.debug("Thinking tools not available")
def create_full_toolkit(base_dir: str | Path | None = None):

View File

@@ -1,21 +0,0 @@
"""Vassal Protocol — Timmy as autonomous orchestrator.
Timmy is Alex's vassal: the lead decision-maker for development direction,
agent management, and house health. He observes the Gitea backlog, decides
priorities, dispatches work to agents (Claude, Kimi, self), monitors output,
and keeps Hermes (M3 Max) running well.
Public API
----------
from timmy.vassal import vassal_orchestrator
await vassal_orchestrator.run_cycle()
snapshot = vassal_orchestrator.get_status()
"""
from timmy.vassal.orchestration_loop import VassalOrchestrator
# Module-level singleton — import and use directly.
vassal_orchestrator = VassalOrchestrator()
__all__ = ["VassalOrchestrator", "vassal_orchestrator"]

View File

@@ -1,296 +0,0 @@
"""Vassal Protocol — agent health monitoring.
Monitors whether downstream agents (Claude, Kimi) are making progress on
their assigned issues. Detects idle and stuck agents by querying Gitea
for issues with dispatch labels and checking last-comment timestamps.
Stuck agent heuristic
---------------------
An agent is considered "stuck" on an issue if:
- The issue has been labeled ``claude-ready`` or ``kimi-ready``
- No new comment has appeared in the last ``stuck_threshold_minutes``
- The issue has not been closed
Idle agent heuristic
--------------------
An agent is "idle" if it has no currently assigned (labeled) open issues.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
_AGENT_LABELS = {
"claude": "claude-ready",
"kimi": "kimi-ready",
}
_DEFAULT_STUCK_MINUTES = 120
_DEFAULT_IDLE_THRESHOLD = 30
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class AgentStatus:
"""Health snapshot for one agent at a point in time."""
agent: str # "claude" | "kimi" | "timmy"
is_idle: bool = True
active_issue_numbers: list[int] = field(default_factory=list)
stuck_issue_numbers: list[int] = field(default_factory=list)
checked_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def is_stuck(self) -> bool:
return bool(self.stuck_issue_numbers)
@property
def needs_reassignment(self) -> bool:
return self.is_stuck
@dataclass
class AgentHealthReport:
"""Combined health report for all monitored agents."""
agents: list[AgentStatus] = field(default_factory=list)
generated_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def any_stuck(self) -> bool:
return any(a.is_stuck for a in self.agents)
@property
def all_idle(self) -> bool:
return all(a.is_idle for a in self.agents)
def for_agent(self, name: str) -> AgentStatus | None:
for a in self.agents:
if a.agent == name:
return a
return None
# ---------------------------------------------------------------------------
# Gitea queries
# ---------------------------------------------------------------------------
async def _fetch_labeled_issues(
client: Any,
base_url: str,
headers: dict,
repo: str,
label: str,
) -> list[dict]:
"""Return open issues carrying a specific label."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params={"state": "open", "labels": label, "limit": 50},
)
if resp.status_code == 200:
return [i for i in resp.json() if not i.get("pull_request")]
except Exception as exc:
logger.warning("_fetch_labeled_issues: %s%s", label, exc)
return []
async def _last_comment_time(
client: Any,
base_url: str,
headers: dict,
repo: str,
issue_number: int,
) -> datetime | None:
"""Return the timestamp of the most recent comment on an issue."""
try:
resp = await client.get(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
params={"limit": 1},
)
if resp.status_code == 200:
comments = resp.json()
if comments:
ts = comments[-1].get("updated_at") or comments[-1].get("created_at")
if ts:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except Exception as exc:
logger.debug("_last_comment_time: issue #%d%s", issue_number, exc)
return None
async def _issue_created_time(issue: dict) -> datetime | None:
ts = issue.get("created_at")
if ts:
try:
return datetime.fromisoformat(ts.replace("Z", "+00:00"))
except ValueError:
pass
return None
# ---------------------------------------------------------------------------
# Health check
# ---------------------------------------------------------------------------
async def check_agent_health(
agent_name: str,
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentStatus:
"""Query Gitea for issues assigned to *agent_name* and assess health.
Args:
agent_name: One of "claude", "kimi".
stuck_threshold_minutes: Minutes of silence before an issue is
considered stuck.
Returns:
AgentStatus for this agent.
"""
status = AgentStatus(agent=agent_name)
label = _AGENT_LABELS.get(agent_name)
if not label:
logger.debug("check_agent_health: unknown agent %s", agent_name)
return status
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("check_agent_health: missing dependency — %s", exc)
return status
if not settings.gitea_enabled or not settings.gitea_token:
return status
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
cutoff = datetime.now(UTC) - timedelta(minutes=stuck_threshold_minutes)
try:
async with httpx.AsyncClient(timeout=15) as client:
issues = await _fetch_labeled_issues(
client, base_url, headers, repo, label
)
for issue in issues:
num = issue.get("number", 0)
status.active_issue_numbers.append(num)
# Check last activity
last_activity = await _last_comment_time(
client, base_url, headers, repo, num
)
if last_activity is None:
last_activity = await _issue_created_time(issue)
if last_activity is not None and last_activity < cutoff:
status.stuck_issue_numbers.append(num)
logger.info(
"check_agent_health: %s issue #%d stuck since %s",
agent_name,
num,
last_activity.isoformat(),
)
except Exception as exc:
logger.warning("check_agent_health: %s query failed — %s", agent_name, exc)
status.is_idle = len(status.active_issue_numbers) == 0
return status
async def get_full_health_report(
stuck_threshold_minutes: int = _DEFAULT_STUCK_MINUTES,
) -> AgentHealthReport:
"""Run health checks for all monitored agents and return combined report.
Args:
stuck_threshold_minutes: Passed through to each agent check.
Returns:
AgentHealthReport with status for Claude and Kimi.
"""
import asyncio
claude_status, kimi_status = await asyncio.gather(
check_agent_health("claude", stuck_threshold_minutes),
check_agent_health("kimi", stuck_threshold_minutes),
)
return AgentHealthReport(agents=[claude_status, kimi_status])
async def nudge_stuck_agent(
agent_name: str,
issue_number: int,
) -> bool:
"""Post a nudge comment on a stuck issue to prompt the agent.
Args:
agent_name: The agent that appears stuck.
issue_number: The Gitea issue number to nudge.
Returns:
True if the comment was posted successfully.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("nudge_stuck_agent: missing dependency — %s", exc)
return False
if not settings.gitea_enabled or not settings.gitea_token:
return False
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
body = (
f"⏰ **Vassal nudge** — @{agent_name} this issue has been idle.\n\n"
"Please post a status update or close if complete."
)
try:
async with httpx.AsyncClient(timeout=10) as client:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue_number}/comments",
headers=headers,
json={"body": body},
)
if resp.status_code in (200, 201):
logger.info(
"nudge_stuck_agent: nudged %s on issue #%d",
agent_name,
issue_number,
)
return True
except Exception as exc:
logger.warning("nudge_stuck_agent: failed — %s", exc)
return False

View File

@@ -1,281 +0,0 @@
"""Vassal Protocol — Gitea backlog triage.
Fetches open issues from Gitea, scores each one for priority and agent
suitability, and returns a ranked list ready for dispatch.
Complexity scoring heuristics
------------------------------
high_complexity_keywords → route to Claude (architecture, refactor, review)
research_keywords → route to Kimi (survey, analysis, benchmark)
routine_keywords → route to Timmy/self (docs, chore, config)
otherwise → Timmy self-handles
Priority scoring
----------------
URGENT label → 100
HIGH / critical → 75
NORMAL (default) → 50
LOW / chore → 25
Already assigned → deprioritized (subtract 20)
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from enum import StrEnum
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Constants
# ---------------------------------------------------------------------------
# Labels that hint at complexity level / agent suitability
_HIGH_COMPLEXITY = frozenset(
{
"architecture",
"refactor",
"code review",
"security",
"performance",
"breaking change",
"design",
"complex",
}
)
_RESEARCH_KEYWORDS = frozenset(
{
"research",
"survey",
"analysis",
"benchmark",
"comparative",
"investigation",
"deep dive",
"review",
}
)
_ROUTINE_KEYWORDS = frozenset(
{
"docs",
"documentation",
"chore",
"config",
"typo",
"rename",
"cleanup",
"trivial",
"style",
}
)
_PRIORITY_LABEL_SCORES: dict[str, int] = {
"urgent": 100,
"critical": 90,
"high": 75,
"normal": 50,
"low": 25,
"chore": 20,
}
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
class AgentTarget(StrEnum):
"""Which agent should handle this issue."""
TIMMY = "timmy" # Timmy handles locally (self)
CLAUDE = "claude" # Dispatch to Claude Code
KIMI = "kimi" # Dispatch to Kimi Code
@dataclass
class TriagedIssue:
"""A Gitea issue enriched with triage metadata."""
number: int
title: str
body: str
labels: list[str] = field(default_factory=list)
assignees: list[str] = field(default_factory=list)
priority_score: int = 50
agent_target: AgentTarget = AgentTarget.TIMMY
rationale: str = ""
url: str = ""
raw: dict = field(default_factory=dict)
# ---------------------------------------------------------------------------
# Scoring helpers
# ---------------------------------------------------------------------------
def _extract_labels(issue: dict[str, Any]) -> list[str]:
"""Return normalised label names from a raw Gitea issue dict."""
return [lbl.get("name", "").lower() for lbl in issue.get("labels", [])]
def _score_priority(labels: list[str], assignees: list[str]) -> int:
score = _PRIORITY_LABEL_SCORES.get("normal", 50)
for lbl in labels:
for key, val in _PRIORITY_LABEL_SCORES.items():
if key in lbl:
score = max(score, val)
if assignees:
score -= 20 # already assigned — lower urgency for fresh dispatch
return max(0, score)
def _choose_agent(title: str, body: str, labels: list[str]) -> tuple[AgentTarget, str]:
"""Heuristic: pick the best agent and return (target, rationale)."""
combined = f"{title} {body} {' '.join(labels)}".lower()
if any(kw in combined for kw in _HIGH_COMPLEXITY):
return AgentTarget.CLAUDE, "high-complexity keywords detected"
if any(kw in combined for kw in _RESEARCH_KEYWORDS):
return AgentTarget.KIMI, "research keywords detected"
if any(kw in combined for kw in _ROUTINE_KEYWORDS):
return AgentTarget.TIMMY, "routine task — Timmy self-handles"
return AgentTarget.TIMMY, "no specific routing signal — Timmy self-handles"
# ---------------------------------------------------------------------------
# Triage
# ---------------------------------------------------------------------------
def triage_issues(raw_issues: list[dict[str, Any]]) -> list[TriagedIssue]:
"""Score and route a list of raw Gitea issue dicts.
Returns a list sorted by priority_score descending (highest first).
Args:
raw_issues: List of issue objects from the Gitea API.
Returns:
Sorted list of TriagedIssue with routing decisions.
"""
results: list[TriagedIssue] = []
for issue in raw_issues:
number = issue.get("number", 0)
title = issue.get("title", "")
body = issue.get("body") or ""
labels = _extract_labels(issue)
assignees = [
a.get("login", "") for a in issue.get("assignees") or []
]
url = issue.get("html_url", "")
priority = _score_priority(labels, assignees)
agent, rationale = _choose_agent(title, body, labels)
results.append(
TriagedIssue(
number=number,
title=title,
body=body,
labels=labels,
assignees=assignees,
priority_score=priority,
agent_target=agent,
rationale=rationale,
url=url,
raw=issue,
)
)
results.sort(key=lambda i: i.priority_score, reverse=True)
logger.debug(
"Triage complete: %d issues → %d Claude, %d Kimi, %d Timmy",
len(results),
sum(1 for i in results if i.agent_target == AgentTarget.CLAUDE),
sum(1 for i in results if i.agent_target == AgentTarget.KIMI),
sum(1 for i in results if i.agent_target == AgentTarget.TIMMY),
)
return results
# ---------------------------------------------------------------------------
# Gitea fetch (async, gracefully degrading)
# ---------------------------------------------------------------------------
async def fetch_open_issues(
limit: int = 50,
exclude_labels: list[str] | None = None,
) -> list[dict[str, Any]]:
"""Fetch open issues from the configured Gitea repo.
Args:
limit: Maximum number of issues to return.
exclude_labels: Labels whose issues should be skipped
(e.g. ``["kimi-ready", "wip"]``).
Returns:
List of raw issue dicts from the Gitea API,
or empty list if Gitea is unavailable.
"""
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("fetch_open_issues: missing dependency — %s", exc)
return []
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("fetch_open_issues: Gitea disabled or no token")
return []
exclude = set(lbl.lower() for lbl in (exclude_labels or []))
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {"Authorization": f"token {settings.gitea_token}"}
params = {"state": "open", "limit": min(limit, 50), "page": 1}
try:
async with httpx.AsyncClient(timeout=15) as client:
resp = await client.get(
f"{base_url}/repos/{repo}/issues",
headers=headers,
params=params,
)
if resp.status_code != 200:
logger.warning(
"fetch_open_issues: Gitea returned %s", resp.status_code
)
return []
issues = resp.json()
# Filter out pull requests and excluded labels
filtered = []
for issue in issues:
if issue.get("pull_request"):
continue # skip PRs
labels = _extract_labels(issue)
if exclude and any(lbl in exclude for lbl in labels):
continue
filtered.append(issue)
logger.info(
"fetch_open_issues: fetched %d/%d issues (after filtering)",
len(filtered),
len(issues),
)
return filtered
except Exception as exc:
logger.warning("fetch_open_issues: Gitea request failed — %s", exc)
return []

View File

@@ -1,213 +0,0 @@
"""Vassal Protocol — agent dispatch.
Translates triage decisions into concrete Gitea actions:
- Add ``claude-ready`` or ``kimi-ready`` label to an issue
- Post a dispatch comment recording the routing rationale
- Record the dispatch in the in-memory registry so the orchestration loop
can track what was sent and when
The dispatch registry is intentionally in-memory (ephemeral). Durable
tracking is out of scope for this module — that belongs in the task queue
or a future orchestration DB.
"""
from __future__ import annotations
import logging
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
from timmy.vassal.backlog import AgentTarget, TriagedIssue
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Label names used by the dispatch system
# ---------------------------------------------------------------------------
_LABEL_MAP: dict[AgentTarget, str] = {
AgentTarget.CLAUDE: "claude-ready",
AgentTarget.KIMI: "kimi-ready",
AgentTarget.TIMMY: "timmy-ready",
}
_LABEL_COLORS: dict[str, str] = {
"claude-ready": "#8b6f47", # warm brown
"kimi-ready": "#006b75", # dark teal
"timmy-ready": "#0075ca", # blue
}
# ---------------------------------------------------------------------------
# Dispatch registry
# ---------------------------------------------------------------------------
@dataclass
class DispatchRecord:
"""A record of one issue being dispatched to an agent."""
issue_number: int
issue_title: str
agent: AgentTarget
rationale: str
dispatched_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
label_applied: bool = False
comment_posted: bool = False
# Module-level registry: issue_number → DispatchRecord
_registry: dict[int, DispatchRecord] = {}
def get_dispatch_registry() -> dict[int, DispatchRecord]:
"""Return a copy of the current dispatch registry."""
return dict(_registry)
def clear_dispatch_registry() -> None:
"""Clear the dispatch registry (mainly for tests)."""
_registry.clear()
# ---------------------------------------------------------------------------
# Gitea helpers
# ---------------------------------------------------------------------------
async def _get_or_create_label(
client: Any,
base_url: str,
headers: dict,
repo: str,
label_name: str,
) -> int | None:
"""Return the Gitea label ID, creating it if necessary."""
labels_url = f"{base_url}/repos/{repo}/labels"
try:
resp = await client.get(labels_url, headers=headers)
if resp.status_code == 200:
for lbl in resp.json():
if lbl.get("name") == label_name:
return lbl["id"]
except Exception as exc:
logger.warning("_get_or_create_label: list failed — %s", exc)
return None
color = _LABEL_COLORS.get(label_name, "#cccccc")
try:
resp = await client.post(
labels_url,
headers={**headers, "Content-Type": "application/json"},
json={"name": label_name, "color": color},
)
if resp.status_code in (200, 201):
return resp.json().get("id")
except Exception as exc:
logger.warning("_get_or_create_label: create failed — %s", exc)
return None
# ---------------------------------------------------------------------------
# Dispatch action
# ---------------------------------------------------------------------------
async def dispatch_issue(issue: TriagedIssue) -> DispatchRecord:
"""Apply dispatch label and post a routing comment on the Gitea issue.
Gracefully degrades: if Gitea is unavailable the record is still
created and returned (with label_applied=False, comment_posted=False).
Args:
issue: A TriagedIssue with a routing decision.
Returns:
DispatchRecord summarising what was done.
"""
record = DispatchRecord(
issue_number=issue.number,
issue_title=issue.title,
agent=issue.agent_target,
rationale=issue.rationale,
)
if issue.agent_target == AgentTarget.TIMMY:
# Self-dispatch: no label needed — Timmy will handle directly.
logger.info(
"dispatch_issue: #%d '%s' → Timmy (self, no label)",
issue.number,
issue.title[:50],
)
_registry[issue.number] = record
return record
try:
import httpx
from config import settings
except ImportError as exc:
logger.warning("dispatch_issue: missing dependency — %s", exc)
_registry[issue.number] = record
return record
if not settings.gitea_enabled or not settings.gitea_token:
logger.info("dispatch_issue: Gitea disabled — skipping label/comment")
_registry[issue.number] = record
return record
base_url = f"{settings.gitea_url}/api/v1"
repo = settings.gitea_repo
headers = {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
}
label_name = _LABEL_MAP[issue.agent_target]
try:
async with httpx.AsyncClient(timeout=15) as client:
label_id = await _get_or_create_label(
client, base_url, headers, repo, label_name
)
# Apply label
if label_id is not None:
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/labels",
headers=headers,
json={"labels": [label_id]},
)
record.label_applied = resp.status_code in (200, 201)
# Post routing comment
agent_name = issue.agent_target.value.capitalize()
comment_body = (
f"🤖 **Vassal dispatch** → routed to **{agent_name}**\n\n"
f"Priority score: {issue.priority_score} \n"
f"Rationale: {issue.rationale} \n"
f"Label: `{label_name}`"
)
resp = await client.post(
f"{base_url}/repos/{repo}/issues/{issue.number}/comments",
headers=headers,
json={"body": comment_body},
)
record.comment_posted = resp.status_code in (200, 201)
except Exception as exc:
logger.warning("dispatch_issue: Gitea action failed — %s", exc)
_registry[issue.number] = record
logger.info(
"dispatch_issue: #%d '%s'%s (label=%s comment=%s)",
issue.number,
issue.title[:50],
issue.agent_target,
record.label_applied,
record.comment_posted,
)
return record

View File

@@ -1,222 +0,0 @@
"""Vassal Protocol — Hermes house health monitoring.
Monitors system resources on the M3 Max (Hermes) and Ollama model state.
Reports warnings when resources are tight and provides cleanup utilities.
All I/O is wrapped in asyncio.to_thread() per CLAUDE.md convention.
"""
from __future__ import annotations
import asyncio
import logging
import shutil
from dataclasses import dataclass, field
from datetime import UTC, datetime
from pathlib import Path
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Thresholds
# ---------------------------------------------------------------------------
_WARN_DISK_PCT = 85.0 # warn when disk is more than 85% full
_WARN_MEM_PCT = 90.0 # warn when memory is more than 90% used
_WARN_CPU_PCT = 95.0 # warn when CPU is above 95% sustained
# ---------------------------------------------------------------------------
# Data models
# ---------------------------------------------------------------------------
@dataclass
class DiskUsage:
path: str = "/"
total_gb: float = 0.0
used_gb: float = 0.0
free_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class MemoryUsage:
total_gb: float = 0.0
available_gb: float = 0.0
percent_used: float = 0.0
@dataclass
class OllamaHealth:
reachable: bool = False
loaded_models: list[str] = field(default_factory=list)
error: str = ""
@dataclass
class SystemSnapshot:
"""Point-in-time snapshot of Hermes resource usage."""
disk: DiskUsage = field(default_factory=DiskUsage)
memory: MemoryUsage = field(default_factory=MemoryUsage)
ollama: OllamaHealth = field(default_factory=OllamaHealth)
warnings: list[str] = field(default_factory=list)
taken_at: str = field(
default_factory=lambda: datetime.now(UTC).isoformat()
)
@property
def healthy(self) -> bool:
return len(self.warnings) == 0
# ---------------------------------------------------------------------------
# Resource probes (sync, run in threads)
# ---------------------------------------------------------------------------
def _probe_disk(path: str = "/") -> DiskUsage:
try:
usage = shutil.disk_usage(path)
total_gb = usage.total / 1e9
used_gb = usage.used / 1e9
free_gb = usage.free / 1e9
pct = (usage.used / usage.total * 100) if usage.total > 0 else 0.0
return DiskUsage(
path=path,
total_gb=round(total_gb, 2),
used_gb=round(used_gb, 2),
free_gb=round(free_gb, 2),
percent_used=round(pct, 1),
)
except Exception as exc:
logger.debug("_probe_disk: %s", exc)
return DiskUsage(path=path)
def _probe_memory() -> MemoryUsage:
try:
import psutil # optional — gracefully degrade if absent
vm = psutil.virtual_memory()
return MemoryUsage(
total_gb=round(vm.total / 1e9, 2),
available_gb=round(vm.available / 1e9, 2),
percent_used=round(vm.percent, 1),
)
except ImportError:
logger.debug("_probe_memory: psutil not installed — skipping")
return MemoryUsage()
except Exception as exc:
logger.debug("_probe_memory: %s", exc)
return MemoryUsage()
def _probe_ollama_sync(ollama_url: str) -> OllamaHealth:
"""Synchronous Ollama health probe — run in a thread."""
try:
import urllib.request
import json
url = ollama_url.rstrip("/") + "/api/tags"
with urllib.request.urlopen(url, timeout=5) as resp: # noqa: S310
data = json.loads(resp.read())
models = [m.get("name", "") for m in data.get("models", [])]
return OllamaHealth(reachable=True, loaded_models=models)
except Exception as exc:
return OllamaHealth(reachable=False, error=str(exc)[:120])
# ---------------------------------------------------------------------------
# Public API
# ---------------------------------------------------------------------------
async def get_system_snapshot() -> SystemSnapshot:
"""Collect a non-blocking snapshot of system resources.
Uses asyncio.to_thread() for all blocking I/O per project convention.
Returns:
SystemSnapshot with disk, memory, and Ollama status.
"""
from config import settings
disk, memory, ollama = await asyncio.gather(
asyncio.to_thread(_probe_disk, "/"),
asyncio.to_thread(_probe_memory),
asyncio.to_thread(_probe_ollama_sync, settings.normalized_ollama_url),
)
warnings: list[str] = []
if disk.percent_used >= _WARN_DISK_PCT:
warnings.append(
f"Disk {disk.path}: {disk.percent_used:.0f}% used "
f"({disk.free_gb:.1f} GB free)"
)
if memory.percent_used >= _WARN_MEM_PCT:
warnings.append(
f"Memory: {memory.percent_used:.0f}% used "
f"({memory.available_gb:.1f} GB available)"
)
if not ollama.reachable:
warnings.append(f"Ollama unreachable: {ollama.error}")
if warnings:
logger.warning("House health warnings: %s", "; ".join(warnings))
return SystemSnapshot(
disk=disk,
memory=memory,
ollama=ollama,
warnings=warnings,
)
async def cleanup_stale_files(
temp_dirs: list[str] | None = None,
max_age_days: int = 7,
) -> dict[str, Any]:
"""Remove files older than *max_age_days* from temp directories.
Only removes files under safe temp paths (never project source).
Args:
temp_dirs: Directories to scan. Defaults to ``["/tmp/timmy"]``.
max_age_days: Age threshold in days.
Returns:
Dict with ``deleted_count`` and ``errors``.
"""
import time
dirs = temp_dirs or ["/tmp/timmy"] # noqa: S108
cutoff = time.time() - max_age_days * 86400
deleted = 0
errors: list[str] = []
def _cleanup() -> None:
nonlocal deleted
for d in dirs:
p = Path(d)
if not p.exists():
continue
for f in p.rglob("*"):
if f.is_file():
try:
if f.stat().st_mtime < cutoff:
f.unlink()
deleted += 1
except Exception as exc:
errors.append(str(exc))
await asyncio.to_thread(_cleanup)
logger.info(
"cleanup_stale_files: deleted %d files, %d errors", deleted, len(errors)
)
return {"deleted_count": deleted, "errors": errors}

View File

@@ -1,321 +0,0 @@
"""Vassal Protocol — main orchestration loop.
Ties the backlog, dispatch, agent health, and house health modules together
into a single ``VassalOrchestrator`` that can run as a background service.
Each cycle:
1. Fetch open Gitea issues
2. Triage: score priority + route to agent
3. Dispatch: apply labels / post routing comments
4. Check agent health: nudge stuck agents
5. Check house health: log warnings, trigger cleanup if needed
6. Return a VassalCycleRecord summarising the cycle
Usage::
from timmy.vassal import vassal_orchestrator
record = await vassal_orchestrator.run_cycle()
status = vassal_orchestrator.get_status()
"""
from __future__ import annotations
import asyncio
import logging
import time
from dataclasses import dataclass, field
from datetime import UTC, datetime
from typing import Any
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# Cycle record
# ---------------------------------------------------------------------------
@dataclass
class VassalCycleRecord:
"""Summary of one orchestration cycle."""
cycle_id: int
started_at: str
finished_at: str = ""
duration_ms: int = 0
issues_fetched: int = 0
issues_dispatched: int = 0
dispatched_to_claude: int = 0
dispatched_to_kimi: int = 0
dispatched_to_timmy: int = 0
stuck_agents: list[str] = field(default_factory=list)
nudges_sent: int = 0
house_warnings: list[str] = field(default_factory=list)
cleanup_deleted: int = 0
errors: list[str] = field(default_factory=list)
@property
def healthy(self) -> bool:
return not self.errors and not self.house_warnings
# ---------------------------------------------------------------------------
# Orchestrator
# ---------------------------------------------------------------------------
class VassalOrchestrator:
"""Timmy's autonomous orchestration engine.
Runs observe → triage → dispatch → monitor → house-check cycles on a
configurable interval.
Parameters
----------
cycle_interval:
Seconds between cycles. Defaults to ``settings.vassal_cycle_interval``
when available, otherwise 300 s (5 min).
max_dispatch_per_cycle:
Cap on new dispatches per cycle to avoid spamming agents.
"""
def __init__(
self,
cycle_interval: float | None = None,
max_dispatch_per_cycle: int = 10,
) -> None:
self._cycle_count = 0
self._running = False
self._task: asyncio.Task | None = None
self._max_dispatch = max_dispatch_per_cycle
self._history: list[VassalCycleRecord] = []
# Resolve interval — lazy to avoid import-time settings read
self._cycle_interval = cycle_interval
# -- public API --------------------------------------------------------
@property
def cycle_count(self) -> int:
return self._cycle_count
@property
def is_running(self) -> bool:
return self._running
@property
def history(self) -> list[VassalCycleRecord]:
return list(self._history)
def get_status(self) -> dict[str, Any]:
"""Return a JSON-serialisable status dict."""
last = self._history[-1] if self._history else None
return {
"running": self._running,
"cycle_count": self._cycle_count,
"last_cycle": {
"cycle_id": last.cycle_id,
"started_at": last.started_at,
"issues_fetched": last.issues_fetched,
"issues_dispatched": last.issues_dispatched,
"stuck_agents": last.stuck_agents,
"house_warnings": last.house_warnings,
"healthy": last.healthy,
}
if last
else None,
}
# -- single cycle ------------------------------------------------------
async def run_cycle(self) -> VassalCycleRecord:
"""Execute one full orchestration cycle.
Gracefully degrades at each step — a failure in one sub-task does
not abort the rest of the cycle.
Returns:
VassalCycleRecord summarising what happened.
"""
self._cycle_count += 1
start = time.monotonic()
record = VassalCycleRecord(
cycle_id=self._cycle_count,
started_at=datetime.now(UTC).isoformat(),
)
# 1 + 2: Fetch & triage
await self._step_backlog(record)
# 3: Agent health
await self._step_agent_health(record)
# 4: House health
await self._step_house_health(record)
# Finalise record
record.finished_at = datetime.now(UTC).isoformat()
record.duration_ms = int((time.monotonic() - start) * 1000)
self._history.append(record)
# Broadcast via WebSocket (best-effort)
await self._broadcast(record)
logger.info(
"VassalOrchestrator cycle #%d complete (%d ms): "
"fetched=%d dispatched=%d stuck=%s house_ok=%s",
record.cycle_id,
record.duration_ms,
record.issues_fetched,
record.issues_dispatched,
record.stuck_agents or "none",
not record.house_warnings,
)
return record
# -- background loop ---------------------------------------------------
async def start(self) -> None:
"""Start the recurring orchestration loop as a background task."""
if self._running:
logger.warning("VassalOrchestrator already running")
return
self._running = True
self._task = asyncio.ensure_future(self._loop())
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
if self._task and not self._task.done():
self._task.cancel()
logger.info("VassalOrchestrator stop requested")
async def _loop(self) -> None:
interval = self._resolve_interval()
logger.info("VassalOrchestrator loop started (interval=%.0fs)", interval)
while self._running:
try:
await self.run_cycle()
except Exception:
logger.exception("VassalOrchestrator cycle failed")
await asyncio.sleep(interval)
# -- step: backlog -------------------------------------------------------
async def _step_backlog(self, record: VassalCycleRecord) -> None:
from timmy.vassal.backlog import fetch_open_issues, triage_issues
from timmy.vassal.dispatch import dispatch_issue, get_dispatch_registry
try:
raw_issues = await fetch_open_issues(
limit=50,
exclude_labels=["wip", "blocked", "needs-info"],
)
record.issues_fetched = len(raw_issues)
if not raw_issues:
return
triaged = triage_issues(raw_issues)
registry = get_dispatch_registry()
dispatched = 0
for issue in triaged:
if dispatched >= self._max_dispatch:
break
# Skip already-dispatched issues
if issue.number in registry:
continue
await dispatch_issue(issue)
dispatched += 1
from timmy.vassal.backlog import AgentTarget
if issue.agent_target == AgentTarget.CLAUDE:
record.dispatched_to_claude += 1
elif issue.agent_target == AgentTarget.KIMI:
record.dispatched_to_kimi += 1
else:
record.dispatched_to_timmy += 1
record.issues_dispatched = dispatched
except Exception as exc:
logger.exception("_step_backlog failed")
record.errors.append(f"backlog: {exc}")
# -- step: agent health -------------------------------------------------
async def _step_agent_health(self, record: VassalCycleRecord) -> None:
from config import settings
from timmy.vassal.agent_health import get_full_health_report, nudge_stuck_agent
try:
threshold = getattr(settings, "vassal_stuck_threshold_minutes", 120)
report = await get_full_health_report(stuck_threshold_minutes=threshold)
for agent_status in report.agents:
if agent_status.is_stuck:
record.stuck_agents.append(agent_status.agent)
for issue_num in agent_status.stuck_issue_numbers:
ok = await nudge_stuck_agent(agent_status.agent, issue_num)
if ok:
record.nudges_sent += 1
except Exception as exc:
logger.exception("_step_agent_health failed")
record.errors.append(f"agent_health: {exc}")
# -- step: house health -------------------------------------------------
async def _step_house_health(self, record: VassalCycleRecord) -> None:
from timmy.vassal.house_health import cleanup_stale_files, get_system_snapshot
try:
snapshot = await get_system_snapshot()
record.house_warnings = snapshot.warnings
# Auto-cleanup temp files when disk is getting tight
if snapshot.disk.percent_used >= 80.0:
result = await cleanup_stale_files(max_age_days=3)
record.cleanup_deleted = result.get("deleted_count", 0)
except Exception as exc:
logger.exception("_step_house_health failed")
record.errors.append(f"house_health: {exc}")
# -- helpers ------------------------------------------------------------
def _resolve_interval(self) -> float:
if self._cycle_interval is not None:
return self._cycle_interval
try:
from config import settings
return float(getattr(settings, "vassal_cycle_interval", 300))
except Exception:
return 300.0
async def _broadcast(self, record: VassalCycleRecord) -> None:
try:
from infrastructure.ws_manager.handler import ws_manager
await ws_manager.broadcast(
"vassal.cycle",
{
"cycle_id": record.cycle_id,
"started_at": record.started_at,
"issues_fetched": record.issues_fetched,
"issues_dispatched": record.issues_dispatched,
"stuck_agents": record.stuck_agents,
"house_warnings": record.house_warnings,
"duration_ms": record.duration_ms,
"healthy": record.healthy,
},
)
except Exception as exc:
logger.debug("VassalOrchestrator broadcast skipped: %s", exc)

View File

@@ -1,503 +0,0 @@
"""Tests for the agent dispatcher (timmy.dispatcher)."""
from __future__ import annotations
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.dispatcher import (
AGENT_REGISTRY,
AgentType,
DispatchResult,
DispatchStatus,
TaskType,
_dispatch_local,
_dispatch_via_api,
_dispatch_via_gitea,
dispatch_task,
infer_task_type,
select_agent,
wait_for_completion,
)
# ---------------------------------------------------------------------------
# Agent registry
# ---------------------------------------------------------------------------
class TestAgentRegistry:
def test_all_agents_present(self):
for member in AgentType:
assert member in AGENT_REGISTRY, f"AgentType.{member.name} missing from registry"
def test_agent_specs_have_display_names(self):
for agent, spec in AGENT_REGISTRY.items():
assert spec.display_name, f"{agent} has empty display_name"
def test_gitea_agents_have_labels(self):
for agent, spec in AGENT_REGISTRY.items():
if spec.interface == "gitea":
assert spec.gitea_label, f"{agent} is gitea interface but has no label"
def test_non_gitea_agents_have_no_labels(self):
for agent, spec in AGENT_REGISTRY.items():
if spec.interface not in ("gitea",):
# api and local agents may have no label
assert spec.gitea_label is None or spec.interface == "gitea"
def test_max_concurrent_positive(self):
for agent, spec in AGENT_REGISTRY.items():
assert spec.max_concurrent >= 1, f"{agent} has max_concurrent < 1"
# ---------------------------------------------------------------------------
# select_agent
# ---------------------------------------------------------------------------
class TestSelectAgent:
def test_architecture_routes_to_claude(self):
assert select_agent(TaskType.ARCHITECTURE) == AgentType.CLAUDE_CODE
def test_refactoring_routes_to_claude(self):
assert select_agent(TaskType.REFACTORING) == AgentType.CLAUDE_CODE
def test_code_review_routes_to_claude(self):
assert select_agent(TaskType.CODE_REVIEW) == AgentType.CLAUDE_CODE
def test_routine_coding_routes_to_kimi(self):
assert select_agent(TaskType.ROUTINE_CODING) == AgentType.KIMI_CODE
def test_fast_iteration_routes_to_kimi(self):
assert select_agent(TaskType.FAST_ITERATION) == AgentType.KIMI_CODE
def test_research_routes_to_agent_api(self):
assert select_agent(TaskType.RESEARCH) == AgentType.AGENT_API
def test_triage_routes_to_timmy(self):
assert select_agent(TaskType.TRIAGE) == AgentType.TIMMY
def test_planning_routes_to_timmy(self):
assert select_agent(TaskType.PLANNING) == AgentType.TIMMY
# ---------------------------------------------------------------------------
# infer_task_type
# ---------------------------------------------------------------------------
class TestInferTaskType:
def test_architecture_keyword(self):
assert infer_task_type("Design the LLM router architecture") == TaskType.ARCHITECTURE
def test_refactor_keyword(self):
assert infer_task_type("Refactor the auth middleware") == TaskType.REFACTORING
def test_code_review_keyword(self):
assert infer_task_type("Review PR for cascade router") == TaskType.CODE_REVIEW
def test_research_keyword(self):
assert infer_task_type("Research embedding models") == TaskType.RESEARCH
def test_triage_keyword(self):
assert infer_task_type("Triage open issues") == TaskType.TRIAGE
def test_planning_keyword(self):
assert infer_task_type("Plan the v2.0 roadmap") == TaskType.PLANNING
def test_fallback_returns_routine_coding(self):
assert infer_task_type("Do the thing") == TaskType.ROUTINE_CODING
def test_description_contributes_to_inference(self):
result = infer_task_type("Implement feature", "We need to refactor the old code")
assert result == TaskType.REFACTORING
def test_case_insensitive(self):
assert infer_task_type("ARCHITECTURE DESIGN") == TaskType.ARCHITECTURE
# ---------------------------------------------------------------------------
# DispatchResult
# ---------------------------------------------------------------------------
class TestDispatchResult:
def test_success_when_assigned(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.ASSIGNED,
)
assert r.success is True
def test_success_when_completed(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.COMPLETED,
)
assert r.success is True
def test_not_success_when_failed(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.FAILED,
)
assert r.success is False
def test_not_success_when_escalated(self):
r = DispatchResult(
task_type=TaskType.ROUTINE_CODING,
agent=AgentType.KIMI_CODE,
issue_number=1,
status=DispatchStatus.ESCALATED,
)
assert r.success is False
# ---------------------------------------------------------------------------
# _dispatch_local
# ---------------------------------------------------------------------------
class TestDispatchLocal:
async def test_returns_assigned(self):
result = await _dispatch_local(
title="Plan the migration",
description="We need a plan.",
acceptance_criteria=["Plan is documented"],
issue_number=42,
)
assert result.status == DispatchStatus.ASSIGNED
assert result.agent == AgentType.TIMMY
assert result.issue_number == 42
async def test_infers_task_type(self):
result = await _dispatch_local(
title="Plan the sprint",
description="",
acceptance_criteria=[],
)
assert result.task_type == TaskType.PLANNING
async def test_no_issue_number(self):
result = await _dispatch_local(title="Do something", description="")
assert result.issue_number is None
# ---------------------------------------------------------------------------
# _dispatch_via_api
# ---------------------------------------------------------------------------
class TestDispatchViaApi:
async def test_no_endpoint_returns_failed(self):
result = await _dispatch_via_api(
agent=AgentType.AGENT_API,
title="Analyse logs",
description="",
acceptance_criteria=[],
)
assert result.status == DispatchStatus.FAILED
assert "No API endpoint" in (result.error or "")
async def test_successful_api_call(self):
mock_resp = MagicMock()
mock_resp.status_code = 202
mock_resp.content = b'{"ok": true}'
mock_resp.json.return_value = {"ok": True}
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await _dispatch_via_api(
agent=AgentType.AGENT_API,
title="Analyse logs",
description="Look at the logs",
acceptance_criteria=["Report produced"],
endpoint="http://fake-agent/dispatch",
)
assert result.status == DispatchStatus.ASSIGNED
assert result.agent == AgentType.AGENT_API
async def test_api_error_returns_failed(self):
mock_resp = MagicMock()
mock_resp.status_code = 500
mock_resp.text = "Internal Server Error"
mock_client = AsyncMock()
mock_client.__aenter__ = AsyncMock(return_value=mock_client)
mock_client.__aexit__ = AsyncMock(return_value=False)
mock_client.post = AsyncMock(return_value=mock_resp)
with patch("httpx.AsyncClient", return_value=mock_client):
result = await _dispatch_via_api(
agent=AgentType.AGENT_API,
title="Analyse logs",
description="",
acceptance_criteria=[],
endpoint="http://fake-agent/dispatch",
)
assert result.status == DispatchStatus.FAILED
assert "500" in (result.error or "")
# ---------------------------------------------------------------------------
# _dispatch_via_gitea
# ---------------------------------------------------------------------------
_GITEA_SETTINGS = MagicMock(
gitea_enabled=True,
gitea_token="test-token",
gitea_url="http://gitea.test",
gitea_repo="owner/repo",
)
class TestDispatchViaGitea:
def _make_client(self, label_list=None, label_create_status=201, comment_status=201):
"""Build a mock httpx.AsyncClient for Gitea interactions."""
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = label_list or []
create_label_resp = MagicMock()
create_label_resp.status_code = label_create_status
create_label_resp.json.return_value = {"id": 99}
apply_label_resp = MagicMock()
apply_label_resp.status_code = 201
comment_resp = MagicMock()
comment_resp.status_code = comment_status
comment_resp.json.return_value = {"id": 7}
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.get = AsyncMock(return_value=label_resp)
client.post = AsyncMock(side_effect=[create_label_resp, apply_label_resp, comment_resp])
return client
async def test_successful_gitea_dispatch(self):
client = self._make_client()
with (
patch("httpx.AsyncClient", return_value=client),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=1072,
title="Design the router",
description="We need a cascade router.",
acceptance_criteria=["Failover works"],
)
assert result.success
assert result.agent == AgentType.CLAUDE_CODE
assert result.issue_number == 1072
assert result.status == DispatchStatus.ASSIGNED
async def test_no_gitea_token_returns_failed(self):
bad_settings = MagicMock(gitea_enabled=True, gitea_token="", gitea_url="http://x", gitea_repo="a/b")
with patch("timmy.dispatcher.settings", bad_settings):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=1,
title="Some task",
description="",
acceptance_criteria=[],
)
assert result.status == DispatchStatus.FAILED
assert "not configured" in (result.error or "").lower()
async def test_gitea_disabled_returns_failed(self):
bad_settings = MagicMock(gitea_enabled=False, gitea_token="tok", gitea_url="http://x", gitea_repo="a/b")
with patch("timmy.dispatcher.settings", bad_settings):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=1,
title="Some task",
description="",
acceptance_criteria=[],
)
assert result.status == DispatchStatus.FAILED
async def test_existing_label_reused(self):
"""When the label already exists, it should be reused (no creation call)."""
label_resp = MagicMock()
label_resp.status_code = 200
label_resp.json.return_value = [{"name": "claude-ready", "id": 55}]
apply_resp = MagicMock()
apply_resp.status_code = 201
comment_resp = MagicMock()
comment_resp.status_code = 201
comment_resp.json.return_value = {"id": 8}
client = AsyncMock()
client.__aenter__ = AsyncMock(return_value=client)
client.__aexit__ = AsyncMock(return_value=False)
client.get = AsyncMock(return_value=label_resp)
client.post = AsyncMock(side_effect=[apply_resp, comment_resp])
with (
patch("httpx.AsyncClient", return_value=client),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
result = await _dispatch_via_gitea(
agent=AgentType.CLAUDE_CODE,
issue_number=10,
title="Architecture task",
description="",
acceptance_criteria=[],
)
assert result.success
# Should only have 2 POST calls: apply label + comment (no label creation)
assert client.post.call_count == 2
# ---------------------------------------------------------------------------
# dispatch_task (integration-style)
# ---------------------------------------------------------------------------
class TestDispatchTask:
async def test_empty_title_returns_failed(self):
result = await dispatch_task(title=" ")
assert result.status == DispatchStatus.FAILED
assert "`title` is required" in (result.error or "")
async def test_local_dispatch_for_timmy_task(self):
result = await dispatch_task(
title="Triage the open issues",
description="We have 40 open issues.",
acceptance_criteria=["Issues are labelled"],
task_type=TaskType.TRIAGE,
)
assert result.agent == AgentType.TIMMY
assert result.success
async def test_explicit_agent_override(self):
"""Caller can force a specific agent regardless of task type."""
result = await dispatch_task(
title="Triage the open issues",
agent=AgentType.TIMMY,
)
assert result.agent == AgentType.TIMMY
async def test_gitea_dispatch_when_issue_provided(self):
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(return_value=MagicMock(status_code=200, json=MagicMock(return_value=[])))
create_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 1}))
apply_resp = MagicMock(status_code=201)
comment_resp = MagicMock(status_code=201, json=MagicMock(return_value={"id": 5}))
client_mock.post = AsyncMock(side_effect=[create_resp, apply_resp, comment_resp])
with (
patch("httpx.AsyncClient", return_value=client_mock),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
result = await dispatch_task(
title="Design the cascade router",
description="Architecture task.",
task_type=TaskType.ARCHITECTURE,
issue_number=1072,
)
assert result.agent == AgentType.CLAUDE_CODE
assert result.success
async def test_escalation_after_max_retries(self):
"""If all attempts fail, the result is ESCALATED."""
with (
patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch,
patch("timmy.dispatcher._log_escalation", new_callable=AsyncMock),
):
mock_dispatch.return_value = DispatchResult(
task_type=TaskType.ARCHITECTURE,
agent=AgentType.CLAUDE_CODE,
issue_number=1,
status=DispatchStatus.FAILED,
error="Gitea offline",
)
result = await dispatch_task(
title="Design router",
task_type=TaskType.ARCHITECTURE,
issue_number=1,
max_retries=1,
)
assert result.status == DispatchStatus.ESCALATED
assert mock_dispatch.call_count == 2 # initial + 1 retry
async def test_no_retry_on_success(self):
with patch("timmy.dispatcher._dispatch_via_gitea", new_callable=AsyncMock) as mock_dispatch:
mock_dispatch.return_value = DispatchResult(
task_type=TaskType.ARCHITECTURE,
agent=AgentType.CLAUDE_CODE,
issue_number=1,
status=DispatchStatus.ASSIGNED,
comment_id=42,
label_applied="claude-ready",
)
result = await dispatch_task(
title="Design router",
task_type=TaskType.ARCHITECTURE,
issue_number=1,
max_retries=2,
)
assert result.success
assert mock_dispatch.call_count == 1 # no retries needed
# ---------------------------------------------------------------------------
# wait_for_completion
# ---------------------------------------------------------------------------
class TestWaitForCompletion:
async def test_returns_completed_when_issue_closed(self):
closed_resp = MagicMock(
status_code=200,
json=MagicMock(return_value={"state": "closed"}),
)
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(return_value=closed_resp)
with (
patch("httpx.AsyncClient", return_value=client_mock),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
):
status = await wait_for_completion(issue_number=42, poll_interval=0, max_wait=5)
assert status == DispatchStatus.COMPLETED
async def test_returns_timed_out_when_still_open(self):
open_resp = MagicMock(
status_code=200,
json=MagicMock(return_value={"state": "open"}),
)
client_mock = AsyncMock()
client_mock.__aenter__ = AsyncMock(return_value=client_mock)
client_mock.__aexit__ = AsyncMock(return_value=False)
client_mock.get = AsyncMock(return_value=open_resp)
with (
patch("httpx.AsyncClient", return_value=client_mock),
patch("timmy.dispatcher.settings", _GITEA_SETTINGS),
patch("asyncio.sleep", new_callable=AsyncMock),
):
status = await wait_for_completion(issue_number=42, poll_interval=1, max_wait=2)
assert status == DispatchStatus.TIMED_OUT

View File

@@ -1,621 +0,0 @@
"""Unit tests for timmy.backlog_triage — autonomous backlog triage loop."""
from datetime import UTC, datetime
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from timmy.backlog_triage import (
AGENT_CLAUDE,
AGENT_KIMI,
KIMI_READY_LABEL,
OWNER_LOGIN,
READY_THRESHOLD,
BacklogTriageLoop,
ScoredIssue,
TriageCycleResult,
TriageDecision,
_build_audit_comment,
_build_daily_summary,
_extract_tags,
_score_acceptance,
_score_alignment,
_score_scope,
decide,
score_issue,
)
# ── Fixtures ─────────────────────────────────────────────────────────────────
def _make_raw_issue(
number: int = 1,
title: str = "Fix the login bug",
body: str = "## Problem\nLogin fails on empty password.\n\n## Steps\nassert response == 200",
labels: list | None = None,
assignees: list | None = None,
created_at: str = "2026-03-20T10:00:00Z",
) -> dict:
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"created_at": created_at,
}
def _make_scored_issue(
number: int = 1,
title: str = "Fix login bug",
issue_type: str = "bug",
score: int = 7,
ready: bool = True,
is_p0: bool = True,
is_blocked: bool = False,
assignees: list | None = None,
tags: set | None = None,
labels: list | None = None,
age_days: int = 3,
) -> ScoredIssue:
return ScoredIssue(
number=number,
title=title,
body="",
labels=labels or [],
tags=tags or {"bug"},
assignees=assignees or [],
created_at=datetime.now(UTC),
issue_type=issue_type,
score=score,
scope=2,
acceptance=2,
alignment=3,
ready=ready,
age_days=age_days,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ── _extract_tags ─────────────────────────────────────────────────────────────
class TestExtractTags:
def test_bracket_tags_in_title(self):
tags = _extract_tags("[Bug] Login fails", [])
assert "bug" in tags
def test_multiple_brackets(self):
tags = _extract_tags("[Bug][P0] Crash on startup", [])
assert "bug" in tags
assert "p0" in tags
def test_label_names(self):
tags = _extract_tags("Fix thing", ["security", "hotfix"])
assert "security" in tags
assert "hotfix" in tags
def test_labels_lowercased(self):
tags = _extract_tags("Title", ["Bug", "FEATURE"])
assert "bug" in tags
assert "feature" in tags
def test_empty_inputs(self):
tags = _extract_tags("", [])
assert tags == set()
# ── Scoring functions ─────────────────────────────────────────────────────────
class TestScoreScope:
def test_file_reference_adds_point(self):
score = _score_scope("Fix auth", "Edit src/timmy/auth.py", set())
assert score >= 1
def test_function_reference_adds_point(self):
score = _score_scope("Fix auth", "def validate_token()", set())
assert score >= 1
def test_short_title_adds_point(self):
score = _score_scope("Short title", "", set())
assert score >= 1
def test_meta_tag_penalizes(self):
score = _score_scope("Discussion about philosophy", "long body " * 5, {"philosophy"})
assert score <= 1
def test_max_score_3(self):
score = _score_scope("Fix auth", "src/auth.py\ndef login()", set())
assert score <= 3
class TestScoreAcceptance:
def test_acceptance_keywords(self):
body = "should return 200\nmust pass tests\nexpect response"
score = _score_acceptance("Title", body, set())
assert score >= 2
def test_test_reference_adds_point(self):
score = _score_acceptance("Title", "Run tox -e unit", set())
assert score >= 1
def test_structured_sections(self):
body = "## Problem\nX\n## Solution\nY"
score = _score_acceptance("Title", body, set())
assert score >= 1
def test_meta_tag_penalizes(self):
score = _score_acceptance("Title", "should do something", {"philosophy"})
# still counts but penalized
assert score <= 2
def test_empty_body(self):
score = _score_acceptance("Title", "", set())
assert score == 0
class TestScoreAlignment:
def test_bug_tags_score_max(self):
assert _score_alignment("", "", {"bug"}) == 3
def test_hotfix_tag_max(self):
assert _score_alignment("", "", {"hotfix"}) == 3
def test_refactor_tag(self):
score = _score_alignment("", "", {"refactor"})
assert score >= 2
def test_feature_tag(self):
score = _score_alignment("", "", {"feature"})
assert score >= 2
def test_meta_tags_zero(self):
assert _score_alignment("", "", {"philosophy"}) == 0
def test_loop_generated_bonus(self):
score = _score_alignment("", "", {"loop-generated"})
assert score >= 1
# ── score_issue ───────────────────────────────────────────────────────────────
class TestScoreIssue:
def test_bug_issue_classified_correctly(self):
raw = _make_raw_issue(labels=["bug"], title="[Bug] Crash on startup")
scored = score_issue(raw)
assert scored.issue_type == "bug"
assert scored.is_p0 is True
def test_feature_issue_classified(self):
raw = _make_raw_issue(labels=["feature"], title="Add voice support")
scored = score_issue(raw)
assert scored.issue_type == "feature"
def test_philosophy_issue_classified(self):
raw = _make_raw_issue(labels=["philosophy"], title="[Philosophy] Should Timmy sleep?")
scored = score_issue(raw)
assert scored.issue_type == "philosophy"
def test_research_issue_classified(self):
raw = _make_raw_issue(labels=["research"], title="Investigate model options")
scored = score_issue(raw)
assert scored.issue_type == "research"
def test_ready_flag_set_when_score_high(self):
body = (
"## Problem\nX breaks.\n## Solution\nFix src/timmy/agent.py def run()\n"
"should return True\nmust pass tox -e unit"
)
raw = _make_raw_issue(labels=["bug"], body=body)
scored = score_issue(raw)
assert scored.score >= READY_THRESHOLD
assert scored.ready is True
def test_is_blocked_detected_in_body(self):
raw = _make_raw_issue(body="This is blocked by issue #50")
scored = score_issue(raw)
assert scored.is_blocked is True
def test_is_blocked_detected_in_title(self):
raw = _make_raw_issue(title="[blocking] Cannot proceed")
scored = score_issue(raw)
# "blocking" in brackets becomes a tag
assert scored.is_blocked is True
def test_unassigned_when_no_assignees(self):
raw = _make_raw_issue(assignees=[])
scored = score_issue(raw)
assert scored.is_unassigned is True
def test_assigned_when_has_assignee(self):
raw = _make_raw_issue(assignees=["claude"])
scored = score_issue(raw)
assert scored.is_unassigned is False
def test_age_days_computed(self):
old_ts = "2026-01-01T00:00:00Z"
raw = _make_raw_issue(created_at=old_ts)
scored = score_issue(raw)
assert scored.age_days > 0
def test_needs_kimi_for_research_label(self):
raw = _make_raw_issue(labels=["kimi-ready"])
scored = score_issue(raw)
assert scored.needs_kimi is True
# ── decide ────────────────────────────────────────────────────────────────────
class TestDecide:
def test_philosophy_skipped(self):
issue = _make_scored_issue(issue_type="philosophy", tags={"philosophy"})
d = decide(issue)
assert d.action == "skip"
assert "philosophy" in d.reason.lower()
def test_assigned_issue_skipped(self):
issue = _make_scored_issue(assignees=["perplexity"])
d = decide(issue)
assert d.action == "skip"
assert "assigned" in d.reason.lower()
def test_low_score_skipped(self):
issue = _make_scored_issue(score=2, ready=False)
d = decide(issue)
assert d.action == "skip"
assert "threshold" in d.reason.lower()
def test_blocked_issue_flagged_for_alex(self):
issue = _make_scored_issue(is_blocked=True)
d = decide(issue)
assert d.action == "flag_alex"
assert d.agent == OWNER_LOGIN
def test_research_issue_assigned_kimi(self):
issue = _make_scored_issue(
issue_type="research",
tags={"research"},
is_p0=False,
is_blocked=False,
)
d = decide(issue)
assert d.action == "assign_kimi"
assert d.agent == AGENT_KIMI
def test_kimi_ready_label_assigns_kimi(self):
issue = _make_scored_issue(
issue_type="unknown",
tags={"kimi-ready"},
labels=["kimi-ready"],
is_p0=False,
is_blocked=False,
)
d = decide(issue)
assert d.action == "assign_kimi"
def test_p0_bug_assigns_claude(self):
issue = _make_scored_issue(issue_type="bug", is_p0=True, is_blocked=False)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_ready_feature_assigns_claude(self):
issue = _make_scored_issue(
issue_type="feature",
is_p0=False,
is_blocked=False,
tags={"feature"},
)
d = decide(issue)
assert d.action == "assign_claude"
assert d.agent == AGENT_CLAUDE
def test_decision_has_reason(self):
issue = _make_scored_issue()
d = decide(issue)
assert len(d.reason) > 10
# ── _build_audit_comment ──────────────────────────────────────────────────────
class TestBuildAuditComment:
def test_contains_timmy_triage_header(self):
d = TriageDecision(42, "assign_claude", "High priority bug", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert "Timmy Triage" in comment
def test_contains_issue_reason(self):
d = TriageDecision(42, "assign_claude", "Urgent P0 bug", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert "Urgent P0 bug" in comment
def test_assign_claude_mentions_agent(self):
d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert AGENT_CLAUDE in comment
def test_assign_kimi_mentions_label(self):
d = TriageDecision(42, "assign_kimi", "reason", agent=AGENT_KIMI)
comment = _build_audit_comment(d)
assert KIMI_READY_LABEL in comment
def test_flag_alex_mentions_owner(self):
d = TriageDecision(42, "flag_alex", "blocked", agent=OWNER_LOGIN)
comment = _build_audit_comment(d)
assert OWNER_LOGIN in comment
def test_contains_override_note(self):
d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE)
comment = _build_audit_comment(d)
assert "override" in comment.lower()
# ── _build_daily_summary ──────────────────────────────────────────────────────
class TestBuildDailySummary:
def _make_result(self, decisions=None) -> TriageCycleResult:
return TriageCycleResult(
timestamp=datetime.now(UTC).isoformat(),
total_open=10,
scored=8,
ready=5,
decisions=decisions or [],
)
def test_contains_open_count(self):
result = self._make_result()
scored = [_make_scored_issue(number=i, ready=True, score=6) for i in range(1, 4)]
summary = _build_daily_summary(result, scored)
assert "10" in summary # total_open
def test_contains_ready_count(self):
result = self._make_result()
summary = _build_daily_summary(result, [])
assert "5" in summary
def test_actions_taken_section(self):
decisions = [
TriageDecision(1, "assign_claude", "P0 bug", agent="claude", executed=True),
]
result = self._make_result(decisions=decisions)
summary = _build_daily_summary(result, [])
assert "Actions Taken" in summary
assert "#1" in summary
def test_top_issues_listed(self):
scored = [_make_scored_issue(number=99, ready=True, score=8)]
result = self._make_result()
summary = _build_daily_summary(result, scored)
assert "#99" in summary
def test_footer_present(self):
summary = _build_daily_summary(self._make_result(), [])
assert "Auto-generated" in summary
# ── BacklogTriageLoop ─────────────────────────────────────────────────────────
class TestBacklogTriageLoop:
def test_default_interval_from_settings(self):
loop = BacklogTriageLoop()
from config import settings
assert loop._interval == float(settings.backlog_triage_interval_seconds)
def test_custom_interval(self):
loop = BacklogTriageLoop(interval=300)
assert loop._interval == 300.0
def test_dry_run_default(self):
loop = BacklogTriageLoop(dry_run=True)
assert loop._dry_run is True
def test_not_running_initially(self):
loop = BacklogTriageLoop()
assert loop.is_running is False
def test_stop_sets_running_false(self):
loop = BacklogTriageLoop()
loop._running = True
loop.stop()
assert loop._running is False
def test_cycle_count_starts_zero(self):
loop = BacklogTriageLoop()
assert loop.cycle_count == 0
@pytest.mark.asyncio
async def test_run_once_skips_when_no_gitea_token(self):
loop = BacklogTriageLoop()
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = ""
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = False
mock_settings.backlog_triage_daily_summary = False
with patch("timmy.backlog_triage.settings", mock_settings):
result = await loop.run_once()
assert result.total_open == 0
@pytest.mark.asyncio
async def test_run_once_dry_run_no_api_writes(self):
"""In dry_run mode, decisions are made but no Gitea API writes happen."""
loop = BacklogTriageLoop(dry_run=True, daily_summary=False)
raw_issues = [
_make_raw_issue(
number=10,
title="Fix crash",
labels=["bug"],
body=(
"## Problem\nCrash on login.\n## Solution\nFix src/auth.py "
"def login()\nshould return 200\nmust pass tox tests"
),
)
]
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = True
mock_settings.backlog_triage_daily_summary = False
mock_client = AsyncMock()
mock_client.get.return_value = MagicMock(
status_code=200, json=MagicMock(return_value=raw_issues)
)
mock_ctx = AsyncMock()
mock_ctx.__aenter__.return_value = mock_client
mock_ctx.__aexit__.return_value = False
with (
patch("timmy.backlog_triage.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_ctx),
):
result = await loop.run_once()
# No POST/PATCH calls in dry run
mock_client.post.assert_not_called()
mock_client.patch.assert_not_called()
assert result.total_open == 1
assert loop.cycle_count == 1
assert len(loop.history) == 1
@pytest.mark.asyncio
async def test_run_once_assigns_unassigned_bug(self):
"""Unassigned ready bug should be assigned to Claude with audit comment."""
loop = BacklogTriageLoop(dry_run=False, daily_summary=False)
body = (
"## Problem\nCrash on login.\n## Solution\nFix src/auth.py "
"def login()\nshould return 200\nmust pass tox tests"
)
raw_issues = [_make_raw_issue(number=5, title="Fix crash", labels=["bug"], body=body)]
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "fake-token"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = False
mock_settings.backlog_triage_daily_summary = False
# GET /issues returns our issue
get_issues_resp = MagicMock(status_code=200)
get_issues_resp.json.return_value = raw_issues
# POST /comments returns success
comment_resp = MagicMock(status_code=201)
comment_resp.json.return_value = {"id": 1}
# PATCH /issues/{n} (assign) returns success
assign_resp = MagicMock(status_code=200)
assign_resp.json.return_value = {"number": 5}
mock_client = AsyncMock()
mock_client.get.return_value = get_issues_resp
mock_client.post.return_value = comment_resp
mock_client.patch.return_value = assign_resp
mock_ctx = AsyncMock()
mock_ctx.__aenter__.return_value = mock_client
mock_ctx.__aexit__.return_value = False
with (
patch("timmy.backlog_triage.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_ctx),
patch("asyncio.sleep", new_callable=AsyncMock),
):
result = await loop.run_once()
assert result.total_open == 1
# Comment should have been posted
mock_client.post.assert_called()
# Assign should have been called (PATCH)
mock_client.patch.assert_called()
@pytest.mark.asyncio
async def test_run_once_skips_already_assigned(self):
"""Issues already assigned should not be acted upon."""
loop = BacklogTriageLoop(dry_run=False, daily_summary=False)
raw_issues = [
_make_raw_issue(
number=3,
labels=["bug"],
assignees=["perplexity"],
body="## Problem\nX\nmust pass tox\nshould return 200 at least 3 times",
)
]
mock_settings = MagicMock()
mock_settings.gitea_enabled = True
mock_settings.gitea_token = "tok"
mock_settings.gitea_repo = "owner/repo"
mock_settings.gitea_url = "http://gitea.local"
mock_settings.backlog_triage_interval_seconds = 900
mock_settings.backlog_triage_dry_run = False
mock_settings.backlog_triage_daily_summary = False
get_resp = MagicMock(status_code=200)
get_resp.json.return_value = raw_issues
mock_client = AsyncMock()
mock_client.get.return_value = get_resp
mock_ctx = AsyncMock()
mock_ctx.__aenter__.return_value = mock_client
mock_ctx.__aexit__.return_value = False
with (
patch("timmy.backlog_triage.settings", mock_settings),
patch("httpx.AsyncClient", return_value=mock_ctx),
):
result = await loop.run_once()
# No writes for already-assigned issue
mock_client.post.assert_not_called()
mock_client.patch.assert_not_called()
assert result.decisions[0].action == "skip"
# ── ScoredIssue properties ────────────────────────────────────────────────────
class TestScoredIssueProperties:
def test_is_unassigned_true_when_no_assignees(self):
issue = _make_scored_issue(assignees=[])
assert issue.is_unassigned is True
def test_is_unassigned_false_when_assigned(self):
issue = _make_scored_issue(assignees=["claude"])
assert issue.is_unassigned is False
def test_needs_kimi_for_research_tag(self):
issue = _make_scored_issue(tags={"research"})
assert issue.needs_kimi is True
def test_needs_kimi_for_kimi_ready_label(self):
issue = _make_scored_issue(labels=["kimi-ready"], tags=set())
assert issue.needs_kimi is True
def test_needs_kimi_false_for_bug(self):
issue = _make_scored_issue(tags={"bug"}, labels=[])
assert issue.needs_kimi is False

View File

@@ -1,103 +0,0 @@
"""Unit tests for timmy.vassal.agent_health."""
from __future__ import annotations
import pytest
from timmy.vassal.agent_health import AgentHealthReport, AgentStatus
# ---------------------------------------------------------------------------
# AgentStatus
# ---------------------------------------------------------------------------
def test_agent_status_idle_default():
s = AgentStatus(agent="claude")
assert s.is_idle is True
assert s.is_stuck is False
assert s.needs_reassignment is False
def test_agent_status_active():
s = AgentStatus(agent="kimi", active_issue_numbers=[10, 11])
s.is_idle = len(s.active_issue_numbers) == 0
assert s.is_idle is False
def test_agent_status_stuck():
s = AgentStatus(
agent="claude",
active_issue_numbers=[7],
stuck_issue_numbers=[7],
is_idle=False,
)
assert s.is_stuck is True
assert s.needs_reassignment is True
# ---------------------------------------------------------------------------
# AgentHealthReport
# ---------------------------------------------------------------------------
def test_report_any_stuck():
claude = AgentStatus(agent="claude", stuck_issue_numbers=[3])
kimi = AgentStatus(agent="kimi")
report = AgentHealthReport(agents=[claude, kimi])
assert report.any_stuck is True
def test_report_all_idle():
report = AgentHealthReport(
agents=[AgentStatus(agent="claude"), AgentStatus(agent="kimi")]
)
assert report.all_idle is True
def test_report_for_agent_found():
kimi = AgentStatus(agent="kimi", active_issue_numbers=[42])
report = AgentHealthReport(agents=[AgentStatus(agent="claude"), kimi])
found = report.for_agent("kimi")
assert found is kimi
def test_report_for_agent_not_found():
report = AgentHealthReport(agents=[AgentStatus(agent="claude")])
assert report.for_agent("timmy") is None
# ---------------------------------------------------------------------------
# check_agent_health — no Gitea in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_check_agent_health_unknown_agent():
"""Unknown agent name returns idle status without error."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("unknown-bot")
assert status.agent == "unknown-bot"
assert status.is_idle is True
@pytest.mark.asyncio
async def test_check_agent_health_no_token():
"""Returns idle status gracefully when Gitea token is absent."""
from timmy.vassal.agent_health import check_agent_health
status = await check_agent_health("claude")
# Should not raise; returns idle (no active issues discovered)
assert isinstance(status, AgentStatus)
assert status.agent == "claude"
@pytest.mark.asyncio
async def test_get_full_health_report_returns_both_agents():
from timmy.vassal.agent_health import get_full_health_report
report = await get_full_health_report()
agent_names = {a.agent for a in report.agents}
assert "claude" in agent_names
assert "kimi" in agent_names

View File

@@ -1,186 +0,0 @@
"""Unit tests for timmy.vassal.backlog — triage and fetch helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import (
AgentTarget,
TriagedIssue,
_choose_agent,
_extract_labels,
_score_priority,
triage_issues,
)
# ---------------------------------------------------------------------------
# _extract_labels
# ---------------------------------------------------------------------------
def test_extract_labels_empty():
assert _extract_labels({}) == []
def test_extract_labels_normalises_case():
issue = {"labels": [{"name": "HIGH"}, {"name": "Feature"}]}
assert _extract_labels(issue) == ["high", "feature"]
# ---------------------------------------------------------------------------
# _score_priority
# ---------------------------------------------------------------------------
def test_priority_urgent():
assert _score_priority(["urgent"], []) == 100
def test_priority_high():
assert _score_priority(["high"], []) == 75
def test_priority_normal_default():
assert _score_priority([], []) == 50
def test_priority_assigned_penalised():
# already assigned → subtract 20
score = _score_priority([], ["some-agent"])
assert score == 30
def test_priority_label_substring_match():
# "critical" contains "critical" → 90
assert _score_priority(["critical-bug"], []) == 90
# ---------------------------------------------------------------------------
# _choose_agent
# ---------------------------------------------------------------------------
def test_choose_claude_for_architecture():
target, rationale = _choose_agent("Refactor auth middleware", "", [])
assert target == AgentTarget.CLAUDE
assert "complex" in rationale or "high-complexity" in rationale
def test_choose_kimi_for_research():
target, rationale = _choose_agent("Deep research on embedding models", "", [])
assert target == AgentTarget.KIMI
def test_choose_timmy_for_docs():
target, rationale = _choose_agent("Update documentation for CLI", "", [])
assert target == AgentTarget.TIMMY
def test_choose_timmy_default():
target, rationale = _choose_agent("Fix typo in README", "simple change", [])
# Could route to timmy (docs/trivial) or default — either is valid
assert isinstance(target, AgentTarget)
def test_choose_agent_label_wins():
# "security" label → Claude
target, _ = _choose_agent("Login page", "", ["security"])
assert target == AgentTarget.CLAUDE
# ---------------------------------------------------------------------------
# triage_issues
# ---------------------------------------------------------------------------
def _make_raw_issue(
number: int,
title: str,
body: str = "",
labels: list[str] | None = None,
assignees: list[str] | None = None,
) -> dict:
return {
"number": number,
"title": title,
"body": body,
"labels": [{"name": lbl} for lbl in (labels or [])],
"assignees": [{"login": a} for a in (assignees or [])],
"html_url": f"http://gitea/issues/{number}",
}
def test_triage_returns_sorted_by_priority():
issues = [
_make_raw_issue(1, "Routine docs update", labels=["docs"]),
_make_raw_issue(2, "Critical security issue", labels=["urgent", "security"]),
_make_raw_issue(3, "Normal feature", labels=[]),
]
triaged = triage_issues(issues)
# Highest priority first
assert triaged[0].number == 2
assert triaged[0].priority_score == 100 # urgent label
def test_triage_prs_can_be_included():
# triage_issues does not filter PRs — that's fetch_open_issues's job
issues = [_make_raw_issue(10, "A PR-like issue")]
triaged = triage_issues(issues)
assert len(triaged) == 1
def test_triage_empty():
assert triage_issues([]) == []
def test_triage_routing():
issues = [
_make_raw_issue(1, "Benchmark LLM backends", body="comprehensive analysis"),
_make_raw_issue(2, "Refactor agent loader", body="architecture change"),
_make_raw_issue(3, "Fix typo in docs", labels=["docs"]),
]
triaged = {i.number: i for i in triage_issues(issues)}
assert triaged[1].agent_target == AgentTarget.KIMI
assert triaged[2].agent_target == AgentTarget.CLAUDE
assert triaged[3].agent_target == AgentTarget.TIMMY
def test_triage_preserves_url():
issues = [_make_raw_issue(42, "Some issue")]
triaged = triage_issues(issues)
assert triaged[0].url == "http://gitea/issues/42"
# ---------------------------------------------------------------------------
# fetch_open_issues — no Gitea available in unit tests
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_fetch_open_issues_returns_empty_when_disabled(monkeypatch):
"""When Gitea is disabled, fetch returns [] without raising."""
import timmy.vassal.backlog as bl
# Patch settings
class FakeSettings:
gitea_enabled = False
gitea_token = ""
gitea_url = "http://localhost:3000"
gitea_repo = "owner/repo"
monkeypatch.setattr(bl, "logger", bl.logger) # no-op just to confirm import
# We can't easily monkeypatch `from config import settings` inside the function,
# so test the no-token path via environment
import os
original = os.environ.pop("GITEA_TOKEN", None)
try:
result = await bl.fetch_open_issues()
# Should return [] gracefully (no token configured by default in test env)
assert isinstance(result, list)
finally:
if original is not None:
os.environ["GITEA_TOKEN"] = original

View File

@@ -1,114 +0,0 @@
"""Unit tests for timmy.vassal.dispatch — routing and label helpers."""
from __future__ import annotations
import pytest
from timmy.vassal.backlog import AgentTarget, TriagedIssue
from timmy.vassal.dispatch import (
DispatchRecord,
clear_dispatch_registry,
get_dispatch_registry,
)
def _make_triaged(
number: int,
title: str,
agent: AgentTarget,
priority: int = 50,
) -> TriagedIssue:
return TriagedIssue(
number=number,
title=title,
body="",
agent_target=agent,
priority_score=priority,
rationale="test rationale",
url=f"http://gitea/issues/{number}",
)
# ---------------------------------------------------------------------------
# Registry helpers
# ---------------------------------------------------------------------------
def test_registry_starts_empty():
clear_dispatch_registry()
assert get_dispatch_registry() == {}
def test_registry_returns_copy():
clear_dispatch_registry()
reg = get_dispatch_registry()
reg[999] = None # type: ignore[assignment]
assert 999 not in get_dispatch_registry()
# ---------------------------------------------------------------------------
# dispatch_issue — Timmy self-dispatch (no Gitea required)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_dispatch_timmy_self_no_gitea():
"""Timmy self-dispatch records without hitting Gitea."""
clear_dispatch_registry()
issue = _make_triaged(1, "Fix docs typo", AgentTarget.TIMMY)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert isinstance(record, DispatchRecord)
assert record.issue_number == 1
assert record.agent == AgentTarget.TIMMY
assert 1 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_claude_no_gitea_token():
"""Claude dispatch gracefully degrades when Gitea token is absent."""
clear_dispatch_registry()
issue = _make_triaged(2, "Refactor auth", AgentTarget.CLAUDE)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.issue_number == 2
assert record.agent == AgentTarget.CLAUDE
# label/comment not applied — no token
assert record.label_applied is False
assert 2 in get_dispatch_registry()
@pytest.mark.asyncio
async def test_dispatch_kimi_no_gitea_token():
clear_dispatch_registry()
issue = _make_triaged(3, "Research embeddings", AgentTarget.KIMI)
from timmy.vassal.dispatch import dispatch_issue
record = await dispatch_issue(issue)
assert record.agent == AgentTarget.KIMI
assert record.label_applied is False
# ---------------------------------------------------------------------------
# DispatchRecord fields
# ---------------------------------------------------------------------------
def test_dispatch_record_defaults():
r = DispatchRecord(
issue_number=5,
issue_title="Test issue",
agent=AgentTarget.TIMMY,
rationale="because",
)
assert r.label_applied is False
assert r.comment_posted is False
assert r.dispatched_at # has a timestamp

View File

@@ -1,116 +0,0 @@
"""Unit tests for timmy.vassal.house_health."""
from __future__ import annotations
import pytest
from timmy.vassal.house_health import (
DiskUsage,
MemoryUsage,
OllamaHealth,
SystemSnapshot,
_probe_disk,
)
# ---------------------------------------------------------------------------
# Data model tests
# ---------------------------------------------------------------------------
def test_system_snapshot_healthy_when_no_warnings():
snap = SystemSnapshot()
assert snap.healthy is True
def test_system_snapshot_unhealthy_with_warnings():
snap = SystemSnapshot(warnings=["disk 90% full"])
assert snap.healthy is False
def test_disk_usage_defaults():
d = DiskUsage()
assert d.percent_used == 0.0
assert d.path == "/"
def test_memory_usage_defaults():
m = MemoryUsage()
assert m.percent_used == 0.0
def test_ollama_health_defaults():
o = OllamaHealth()
assert o.reachable is False
assert o.loaded_models == []
# ---------------------------------------------------------------------------
# _probe_disk — runs against real filesystem
# ---------------------------------------------------------------------------
def test_probe_disk_root():
result = _probe_disk("/")
assert result.total_gb > 0
assert 0.0 <= result.percent_used <= 100.0
assert result.free_gb >= 0
def test_probe_disk_bad_path():
result = _probe_disk("/nonexistent_path_xyz")
# Should not raise — returns zeroed DiskUsage
assert result.percent_used == 0.0
# ---------------------------------------------------------------------------
# get_system_snapshot — async
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_get_system_snapshot_returns_snapshot():
from timmy.vassal.house_health import get_system_snapshot
snap = await get_system_snapshot()
assert isinstance(snap, SystemSnapshot)
# Disk is always probed
assert snap.disk.total_gb >= 0
# Ollama is likely unreachable in test env — that's fine
assert isinstance(snap.ollama, OllamaHealth)
@pytest.mark.asyncio
async def test_get_system_snapshot_disk_warning(monkeypatch):
"""When disk is above threshold, a warning is generated."""
import timmy.vassal.house_health as hh
# Patch _probe_disk to return high usage
def _full_disk(path: str) -> DiskUsage:
return DiskUsage(
path=path,
total_gb=100.0,
used_gb=90.0,
free_gb=10.0,
percent_used=90.0,
)
monkeypatch.setattr(hh, "_probe_disk", _full_disk)
snap = await hh.get_system_snapshot()
assert any("disk" in w.lower() or "Disk" in w for w in snap.warnings)
# ---------------------------------------------------------------------------
# cleanup_stale_files — temp dir test
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_cleanup_stale_files_missing_dir():
"""Should not raise when the target dir doesn't exist."""
from timmy.vassal.house_health import cleanup_stale_files
result = await cleanup_stale_files(temp_dirs=["/tmp/timmy_test_xyz_nonexistent"])
assert result["deleted_count"] == 0
assert result["errors"] == []

View File

@@ -1,139 +0,0 @@
"""Unit tests for timmy.vassal.orchestration_loop — VassalOrchestrator."""
from __future__ import annotations
import pytest
from timmy.vassal.orchestration_loop import VassalCycleRecord, VassalOrchestrator
# ---------------------------------------------------------------------------
# VassalCycleRecord
# ---------------------------------------------------------------------------
def test_cycle_record_healthy_when_no_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
)
assert r.healthy is True
def test_cycle_record_unhealthy_with_errors():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
errors=["backlog: connection refused"],
)
assert r.healthy is False
def test_cycle_record_unhealthy_with_warnings():
r = VassalCycleRecord(
cycle_id=1,
started_at="2026-01-01T00:00:00+00:00",
house_warnings=["disk 90% full"],
)
assert r.healthy is False
# ---------------------------------------------------------------------------
# VassalOrchestrator state
# ---------------------------------------------------------------------------
def test_orchestrator_initial_state():
orch = VassalOrchestrator()
assert orch.cycle_count == 0
assert orch.is_running is False
assert orch.history == []
def test_orchestrator_get_status_no_cycles():
orch = VassalOrchestrator()
status = orch.get_status()
assert status["running"] is False
assert status["cycle_count"] == 0
assert status["last_cycle"] is None
# ---------------------------------------------------------------------------
# run_cycle — integration (no Gitea, no Ollama in test env)
# ---------------------------------------------------------------------------
@pytest.mark.asyncio
async def test_run_cycle_completes_without_services():
"""run_cycle must complete and record even when external services are down."""
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator(cycle_interval=300)
record = await orch.run_cycle()
assert isinstance(record, VassalCycleRecord)
assert record.cycle_id == 1
assert record.finished_at # was set
assert record.duration_ms >= 0
# No Gitea → fetched = 0, dispatched = 0
assert record.issues_fetched == 0
assert record.issues_dispatched == 0
# History updated
assert len(orch.history) == 1
assert orch.cycle_count == 1
@pytest.mark.asyncio
async def test_run_cycle_increments_cycle_count():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
await orch.run_cycle()
assert orch.cycle_count == 2
assert len(orch.history) == 2
@pytest.mark.asyncio
async def test_get_status_after_cycle():
from timmy.vassal.dispatch import clear_dispatch_registry
clear_dispatch_registry()
orch = VassalOrchestrator()
await orch.run_cycle()
status = orch.get_status()
assert status["cycle_count"] == 1
last = status["last_cycle"]
assert last is not None
assert last["cycle_id"] == 1
assert last["issues_fetched"] == 0
# ---------------------------------------------------------------------------
# start / stop
# ---------------------------------------------------------------------------
def test_orchestrator_stop_when_not_running():
"""stop() on an idle orchestrator must not raise."""
orch = VassalOrchestrator()
orch.stop() # should be a no-op
assert orch.is_running is False
# ---------------------------------------------------------------------------
# Module-level singleton
# ---------------------------------------------------------------------------
def test_module_singleton_exists():
from timmy.vassal import vassal_orchestrator, VassalOrchestrator
assert isinstance(vassal_orchestrator, VassalOrchestrator)

21
tox.ini
View File

@@ -47,10 +47,12 @@ commands =
# ── Test Environments ────────────────────────────────────────────────────────
[testenv:unit]
description = Fast unit tests — only tests marked @pytest.mark.unit
description = Fast tests — excludes e2e, functional, and external services
commands =
pytest tests/ -q --tb=short \
-m "unit and not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
--ignore=tests/e2e \
--ignore=tests/functional \
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow" \
-n auto --dist worksteal
[testenv:integration]
@@ -102,7 +104,7 @@ commands =
--cov-report=xml:reports/coverage.xml \
--cov-fail-under=73 \
--junitxml=reports/junit.xml \
-p no:xdist \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not skip_ci and not slow"
[testenv:coverage]
@@ -113,7 +115,7 @@ commands =
--cov-report=term-missing \
--cov-report=xml \
--cov-fail-under=73 \
-p no:xdist \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not slow"
[testenv:coverage-html]
@@ -124,7 +126,16 @@ commands =
--cov-report=term-missing \
--cov-report=html \
--cov-fail-under=73 \
-p no:xdist \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not slow"
[testenv:coverage-parallel]
description = Parallel coverage report
commands =
pytest tests/ -q --tb=short \
--cov=src \
--cov-report=term-missing \
-n auto --dist worksteal \
-m "not ollama and not docker and not selenium and not external_api and not slow"
# ── Pre-push (mirrors CI exactly) ────────────────────────────────────────────