diff --git a/src/config.py b/src/config.py index 712e5750..5b7b7342 100644 --- a/src/config.py +++ b/src/config.py @@ -304,6 +304,16 @@ class Settings(BaseSettings): mcp_timeout: int = 15 mcp_bridge_timeout: int = 60 # HTTP timeout for MCP bridge Ollama calls (seconds) + # ── Backlog Triage Loop ──────────────────────────────────────────── + # Autonomous loop: fetch open issues, score, assign to agents. + backlog_triage_enabled: bool = False + # Seconds between triage cycles (default: 15 minutes). + backlog_triage_interval_seconds: int = 900 + # When True, score and summarize but don't write to Gitea. + backlog_triage_dry_run: bool = False + # Create a daily triage summary issue/comment. + backlog_triage_daily_summary: bool = True + # ── Loop QA (Self-Testing) ───────────────────────────────────────── # Self-test orchestrator that probes capabilities alongside the thinking loop. loop_qa_enabled: bool = True diff --git a/src/timmy/backlog_triage.py b/src/timmy/backlog_triage.py new file mode 100644 index 00000000..935da163 --- /dev/null +++ b/src/timmy/backlog_triage.py @@ -0,0 +1,759 @@ +"""Autonomous backlog triage loop — Timmy scans Gitea and assigns work. + +Continuously fetches open issues, scores/prioritizes them, and decides +what to work on next without waiting to be asked. + +Loop flow:: + + while true: + 1. Fetch all open issues from Gitea API + 2. Score/prioritize by labels, age, type, blocked status + 3. Identify unassigned high-priority items + 4. Decide: assign to claude, dispatch to kimi, or flag for Alex + 5. Execute the assignment (comment + assign) + 6. Optionally post a daily triage summary + 7. Sleep for configurable interval (default 15 min) + +Priority tiers: + P0 — security, data loss, blocking bugs → immediate action + P1 — core functionality, ready issues → next sprint + P2 — improvements, low-score issues → backlog + P3 — philosophy, meta → someday/never (skip in triage) + +Usage:: + + from timmy.backlog_triage import BacklogTriageLoop + + loop = BacklogTriageLoop() + await loop.run_once() # single triage cycle + await loop.start() # background daemon loop + loop.stop() # graceful shutdown +""" + +from __future__ import annotations + +import asyncio +import logging +import re +from dataclasses import dataclass, field +from datetime import UTC, datetime, timedelta +from typing import Any + +import httpx + +from config import settings + +logger = logging.getLogger(__name__) + +# ── Constants ──────────────────────────────────────────────────────────────── + +# Minimum triage score to be considered "ready" for assignment +READY_THRESHOLD = 5 + +# Agent Gitea logins +AGENT_CLAUDE = "claude" +AGENT_KIMI = "kimi" +OWNER_LOGIN = "rockachopa" # Alex — human owner + +# Labels +KIMI_READY_LABEL = "kimi-ready" +TRIAGE_DONE_LABEL = "triage-done" + +# Tag sets (mirrors scripts/triage_score.py) +_BUG_TAGS = frozenset({"bug", "broken", "crash", "error", "fix", "regression", "hotfix"}) +_FEATURE_TAGS = frozenset({"feature", "feat", "enhancement", "capability", "timmy-capability"}) +_REFACTOR_TAGS = frozenset({"refactor", "cleanup", "tech-debt", "optimization", "perf"}) +_META_TAGS = frozenset({"philosophy", "soul-gap", "discussion", "question", "rfc"}) +_P0_TAGS = frozenset({"security", "data-loss", "blocking", "p0", "critical"}) +_RESEARCH_TAGS = frozenset({"research", "kimi-ready", "investigation", "spike"}) +_LOOP_TAG = "loop-generated" + +# Regex patterns for scoring +_TAG_RE = re.compile(r"\[([^\]]+)\]") +_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE) +_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE) +_ACCEPT_RE = re.compile( + r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria" + r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)", + re.IGNORECASE, +) +_TEST_RE = re.compile(r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE) +_BLOCKED_RE = re.compile(r"\bblock(?:ed|s|ing)\b", re.IGNORECASE) + + +# ── Data types ─────────────────────────────────────────────────────────────── + + +@dataclass +class ScoredIssue: + """A Gitea issue enriched with triage scoring.""" + + number: int + title: str + body: str + labels: list[str] + tags: set[str] + assignees: list[str] + created_at: datetime + issue_type: str # bug | feature | refactor | philosophy | research | unknown + + score: int = 0 + scope: int = 0 + acceptance: int = 0 + alignment: int = 0 + ready: bool = False + age_days: int = 0 + is_p0: bool = False + is_blocked: bool = False + + @property + def is_unassigned(self) -> bool: + return len(self.assignees) == 0 + + @property + def needs_kimi(self) -> bool: + return bool(self.tags & _RESEARCH_TAGS) or KIMI_READY_LABEL in self.labels + + +@dataclass +class TriageDecision: + """The outcome of a triage decision for a single issue.""" + + issue_number: int + action: str # "assign_claude" | "assign_kimi" | "flag_alex" | "skip" + reason: str + agent: str = "" # the agent assigned (login) + executed: bool = False + error: str = "" + + +@dataclass +class TriageCycleResult: + """Summary of one complete triage cycle.""" + + timestamp: str + total_open: int + scored: int + ready: int + decisions: list[TriageDecision] = field(default_factory=list) + errors: list[str] = field(default_factory=list) + duration_ms: int = 0 + + +# ── Scoring ────────────────────────────────────────────────────────────────── + + +def _extract_tags(title: str, labels: list[str]) -> set[str]: + """Pull tags from [bracket] title notation + Gitea label names.""" + tags: set[str] = set() + for m in _TAG_RE.finditer(title): + tags.add(m.group(1).lower().strip()) + for lbl in labels: + tags.add(lbl.lower().strip()) + return tags + + +def _score_scope(title: str, body: str, tags: set[str]) -> int: + """0–3: How well-scoped is this issue?""" + text = f"{title}\n{body}" + score = 0 + if _FILE_RE.search(text): + score += 1 + if _FUNC_RE.search(text): + score += 1 + clean = _TAG_RE.sub("", title).strip() + if len(clean) < 80: + score += 1 + if tags & _META_TAGS: + score = max(0, score - 2) + return min(3, score) + + +def _score_acceptance(title: str, body: str, tags: set[str]) -> int: + """0–3: Does this have clear acceptance criteria?""" + text = f"{title}\n{body}" + score = 0 + matches = len(_ACCEPT_RE.findall(text)) + if matches >= 3: + score += 2 + elif matches >= 1: + score += 1 + if _TEST_RE.search(text): + score += 1 + if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE): + score += 1 + if tags & _META_TAGS: + score = max(0, score - 1) + return min(3, score) + + +def _score_alignment(title: str, body: str, tags: set[str]) -> int: + """0–3: How aligned is this with the north star?""" + score = 0 + if tags & _BUG_TAGS: + return 3 + if tags & _REFACTOR_TAGS: + score += 2 + if tags & _FEATURE_TAGS: + score += 2 + if _LOOP_TAG in tags: + score += 1 + if tags & _META_TAGS: + score = 0 + return min(3, score) + + +def score_issue(issue: dict[str, Any]) -> ScoredIssue: + """Score and classify a raw Gitea issue dict.""" + number = issue["number"] + title = issue.get("title", "") + body = issue.get("body") or "" + label_names = [lbl["name"] for lbl in issue.get("labels", [])] + tags = _extract_tags(title, label_names) + assignees = [a["login"] for a in issue.get("assignees", [])] + + # Parse created_at + raw_ts = issue.get("created_at", "") + try: + created_at = datetime.fromisoformat(raw_ts.replace("Z", "+00:00")) + except (ValueError, AttributeError): + created_at = datetime.now(UTC) + age_days = (datetime.now(UTC) - created_at).days + + # Scores + scope = _score_scope(title, body, tags) + acceptance = _score_acceptance(title, body, tags) + alignment = _score_alignment(title, body, tags) + total = scope + acceptance + alignment + + # Classify + if tags & _BUG_TAGS: + issue_type = "bug" + elif tags & _RESEARCH_TAGS: + issue_type = "research" + elif tags & _FEATURE_TAGS: + issue_type = "feature" + elif tags & _REFACTOR_TAGS: + issue_type = "refactor" + elif tags & _META_TAGS: + issue_type = "philosophy" + else: + issue_type = "unknown" + + is_p0 = bool(tags & _P0_TAGS) or issue_type == "bug" + is_blocked = bool(_BLOCKED_RE.search(title) or _BLOCKED_RE.search(body)) + + return ScoredIssue( + number=number, + title=_TAG_RE.sub("", title).strip(), + body=body, + labels=label_names, + tags=tags, + assignees=assignees, + created_at=created_at, + issue_type=issue_type, + score=total, + scope=scope, + acceptance=acceptance, + alignment=alignment, + ready=total >= READY_THRESHOLD, + age_days=age_days, + is_p0=is_p0, + is_blocked=is_blocked, + ) + + +# ── Decision logic ─────────────────────────────────────────────────────────── + + +def decide(issue: ScoredIssue) -> TriageDecision: + """Decide what to do with an issue. + + Returns a TriageDecision with action, reason, and agent. + Decision is not yet executed — call execute_decision() for that. + """ + num = issue.number + + # Skip philosophy/meta — not dev-actionable + if issue.issue_type == "philosophy": + return TriageDecision( + issue_number=num, + action="skip", + reason="Philosophy/meta issue — not dev-actionable in the triage loop.", + ) + + # Skip already-assigned issues + if not issue.is_unassigned: + return TriageDecision( + issue_number=num, + action="skip", + reason=f"Already assigned to: {', '.join(issue.assignees)}.", + ) + + # Skip if not ready (low score) + if not issue.ready: + return TriageDecision( + issue_number=num, + action="skip", + reason=f"Score {issue.score} < {READY_THRESHOLD} threshold — needs more detail before assignment.", + ) + + # Blocked: flag for Alex + if issue.is_blocked: + return TriageDecision( + issue_number=num, + action="flag_alex", + agent=OWNER_LOGIN, + reason=( + "Issue appears blocked. Flagging for @rockachopa to unblock before autonomous assignment." + ), + ) + + # Research / Kimi-ready + if issue.needs_kimi: + return TriageDecision( + issue_number=num, + action="assign_kimi", + agent=AGENT_KIMI, + reason=( + f"Issue type '{issue.issue_type}' with research/investigation scope. " + f"Assigning kimi-ready label for Kimi agent to pick up." + ), + ) + + # P0 bugs and blocking issues → Claude immediately + if issue.is_p0: + return TriageDecision( + issue_number=num, + action="assign_claude", + agent=AGENT_CLAUDE, + reason=( + f"P0/{issue.issue_type} issue (score={issue.score}, age={issue.age_days}d). " + f"Assigning to Claude Code for immediate attention." + ), + ) + + # Everything else that is ready → Claude Code + return TriageDecision( + issue_number=num, + action="assign_claude", + agent=AGENT_CLAUDE, + reason=( + f"Unassigned ready issue (type={issue.issue_type}, score={issue.score}, " + f"age={issue.age_days}d). Assigning to Claude Code." + ), + ) + + +# ── Gitea API client ───────────────────────────────────────────────────────── + + +def _api_headers() -> dict[str, str]: + return { + "Authorization": f"token {settings.gitea_token}", + "Content-Type": "application/json", + "Accept": "application/json", + } + + +def _repo_url(path: str) -> str: + owner, repo = settings.gitea_repo.split("/", 1) + return f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/{path}" + + +async def fetch_open_issues(client: httpx.AsyncClient) -> list[dict[str, Any]]: + """Fetch all open issues from Gitea, paginating as needed.""" + all_issues: list[dict[str, Any]] = [] + page = 1 + while True: + url = _repo_url(f"issues?state=open&type=issues&limit=50&page={page}") + try: + resp = await client.get(url, headers=_api_headers()) + if resp.status_code != 200: + logger.warning("Gitea issues fetch failed (HTTP %s)", resp.status_code) + break + batch: list[dict[str, Any]] = resp.json() + if not batch: + break + all_issues.extend(batch) + if len(batch) < 50: + break + page += 1 + except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc: + logger.warning("Gitea connection error fetching issues: %s", exc) + break + return all_issues + + +async def post_comment( + client: httpx.AsyncClient, + issue_number: int, + body: str, +) -> bool: + """Post a comment on a Gitea issue. Returns True on success.""" + url = _repo_url(f"issues/{issue_number}/comments") + try: + resp = await client.post(url, headers=_api_headers(), json={"body": body}) + return resp.status_code in (200, 201) + except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc: + logger.warning("Failed to post comment on #%d: %s", issue_number, exc) + return False + + +async def assign_issue( + client: httpx.AsyncClient, + issue_number: int, + assignee: str, +) -> bool: + """Assign an issue to a Gitea user. Returns True on success.""" + url = _repo_url(f"issues/{issue_number}") + try: + resp = await client.patch( + url, + headers=_api_headers(), + json={"assignees": [assignee]}, + ) + return resp.status_code in (200, 201) + except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc: + logger.warning("Failed to assign #%d to %s: %s", issue_number, assignee, exc) + return False + + +async def add_label( + client: httpx.AsyncClient, + issue_number: int, + label_name: str, +) -> bool: + """Add a label to a Gitea issue by name (auto-creates if missing). Returns True on success.""" + owner, repo = settings.gitea_repo.split("/", 1) + labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels" + headers = _api_headers() + + try: + # Fetch existing labels + resp = await client.get(labels_url, headers=headers) + if resp.status_code != 200: + return False + existing = {lbl["name"]: lbl["id"] for lbl in resp.json()} + + if label_name in existing: + label_id = existing[label_name] + else: + # Auto-create the label + create_resp = await client.post( + labels_url, + headers=headers, + json={"name": label_name, "color": "#006b75"}, + ) + if create_resp.status_code not in (200, 201): + return False + label_id = create_resp.json()["id"] + + # Apply to the issue + apply_url = _repo_url(f"issues/{issue_number}/labels") + apply_resp = await client.post( + apply_url, headers=headers, json={"labels": [label_id]} + ) + return apply_resp.status_code in (200, 201) + + except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc: + logger.warning("Failed to add label %r to #%d: %s", label_name, issue_number, exc) + return False + + +# ── Decision execution ─────────────────────────────────────────────────────── + + +async def execute_decision( + client: httpx.AsyncClient, + decision: TriageDecision, + dry_run: bool = False, +) -> TriageDecision: + """Execute a triage decision — comment + assign/label. + + When dry_run=True, logs the decision but makes no Gitea API calls. + Returns the updated decision with executed=True on success. + """ + num = decision.issue_number + + if decision.action == "skip": + logger.debug("Triage skip #%d: %s", num, decision.reason) + decision.executed = True + return decision + + audit_comment = _build_audit_comment(decision) + + if dry_run: + logger.info( + "[DRY RUN] #%d → %s (%s): %s", + num, + decision.action, + decision.agent, + decision.reason, + ) + decision.executed = True + return decision + + # Post audit comment first (always, so Alex can see reasoning) + comment_ok = await post_comment(client, num, audit_comment) + if not comment_ok: + decision.error = "Failed to post audit comment" + logger.warning("Triage #%d: comment failed", num) + return decision + + # Execute assignment + ok = False + if decision.action == "assign_claude": + ok = await assign_issue(client, num, AGENT_CLAUDE) + elif decision.action == "assign_kimi": + ok = await add_label(client, num, KIMI_READY_LABEL) + elif decision.action == "flag_alex": + # Comment already posted above — that's sufficient for flagging + ok = True + + if ok: + decision.executed = True + logger.info("Triage #%d → %s OK", num, decision.action) + else: + decision.error = f"Action {decision.action!r} failed" + logger.warning("Triage #%d: action %r failed", num, decision.action) + + return decision + + +def _build_audit_comment(decision: TriageDecision) -> str: + """Build the audit trail comment that Alex can read to see reasoning.""" + ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + action_text = { + "assign_claude": f"Assigning to @{AGENT_CLAUDE} for implementation.", + "assign_kimi": f"Adding `{KIMI_READY_LABEL}` label — queuing for Kimi research agent.", + "flag_alex": f"Flagging for @{OWNER_LOGIN} — issue appears blocked or needs human decision.", + }.get(decision.action, decision.action) + + return ( + f"**[Timmy Triage — {ts}]**\n\n" + f"**Decision:** {action_text}\n\n" + f"**Why:** {decision.reason}\n\n" + f"*Autonomous triage by Timmy. Reply to override.*" + ) + + +# ── Daily summary ───────────────────────────────────────────────────────────── + + +def _build_daily_summary(result: TriageCycleResult, scored: list[ScoredIssue]) -> str: + """Build the daily triage summary body.""" + now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC") + assigned = [d for d in result.decisions if d.executed and d.action != "skip"] + skipped = [d for d in result.decisions if d.action == "skip"] + + lines = [ + f"# Timmy Backlog Triage — {now}", + "", + f"**Open issues:** {result.total_open} | " + f"**Scored:** {result.scored} | " + f"**Ready:** {result.ready} | " + f"**Assigned this cycle:** {len(assigned)}", + "", + "## Top 10 Ready Issues (by score)", + "", + ] + + top = sorted([s for s in scored if s.ready], key=lambda s: (-s.score, s.number))[:10] + for s in top: + flag = "🐛" if s.issue_type == "bug" else "⚡" if s.is_p0 else "✦" + lines.append( + f"- {flag} **#{s.number}** (score={s.score}, age={s.age_days}d) — {s.title[:80]}" + ) + + if assigned: + lines += ["", "## Actions Taken", ""] + for d in assigned: + lines.append(f"- #{d.issue_number} → `{d.action}` ({d.agent}): {d.reason[:100]}") + + if skipped: + lines += ["", f"## Skipped ({len(skipped)} issues)", ""] + for d in skipped[:5]: + lines.append(f"- #{d.issue_number}: {d.reason[:80]}") + if len(skipped) > 5: + lines.append(f"- … and {len(skipped) - 5} more") + + lines += [ + "", + "---", + "*Auto-generated by Timmy's backlog triage loop. " + "Override any decision by reassigning or commenting.*", + ] + return "\n".join(lines) + + +async def post_daily_summary( + client: httpx.AsyncClient, + result: TriageCycleResult, + scored: list[ScoredIssue], + dry_run: bool = False, +) -> bool: + """Post a daily triage summary as a new Gitea issue.""" + today = datetime.now(UTC).strftime("%Y-%m-%d") + title = f"[Triage] Daily backlog summary — {today}" + body = _build_daily_summary(result, scored) + + if dry_run: + logger.info("[DRY RUN] Would post daily summary: %s", title) + return True + + url = _repo_url("issues") + try: + resp = await client.post( + url, + headers=_api_headers(), + json={ + "title": title, + "body": body, + "labels": [], + }, + ) + if resp.status_code in (200, 201): + issue_num = resp.json().get("number", "?") + logger.info("Daily triage summary posted as issue #%s", issue_num) + return True + logger.warning("Daily summary post failed (HTTP %s)", resp.status_code) + return False + except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc: + logger.warning("Failed to post daily summary: %s", exc) + return False + + +# ── Main loop class ─────────────────────────────────────────────────────────── + + +class BacklogTriageLoop: + """Autonomous backlog triage loop. + + Fetches, scores, and assigns Gitea issues on a configurable interval. + + Parameters + ---------- + interval: + Seconds between triage cycles. Default: settings.backlog_triage_interval_seconds. + dry_run: + When True, score and log decisions but don't write to Gitea. + daily_summary: + When True, post a daily triage summary issue after each cycle. + """ + + def __init__( + self, + *, + interval: float | None = None, + dry_run: bool | None = None, + daily_summary: bool | None = None, + ) -> None: + self._interval = float(interval or settings.backlog_triage_interval_seconds) + self._dry_run = dry_run if dry_run is not None else settings.backlog_triage_dry_run + self._daily_summary = ( + daily_summary if daily_summary is not None else settings.backlog_triage_daily_summary + ) + self._running = False + self._task: asyncio.Task | None = None + self._cycle_count = 0 + self._last_summary_date: str = "" + self.history: list[TriageCycleResult] = [] + + @property + def is_running(self) -> bool: + return self._running + + @property + def cycle_count(self) -> int: + return self._cycle_count + + async def run_once(self) -> TriageCycleResult: + """Execute one full triage cycle. + + 1. Fetch all open Gitea issues + 2. Score and prioritize + 3. Decide on each unassigned ready issue + 4. Execute decisions + 5. Optionally post daily summary + """ + import time + + self._cycle_count += 1 + start = time.monotonic() + ts = datetime.now(UTC).isoformat() + result = TriageCycleResult(timestamp=ts, total_open=0, scored=0, ready=0) + + if not settings.gitea_enabled or not settings.gitea_token: + logger.warning("Backlog triage: Gitea not configured — skipping cycle") + return result + + async with httpx.AsyncClient(timeout=30) as client: + # 1. Fetch + raw_issues = await fetch_open_issues(client) + result.total_open = len(raw_issues) + logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues)) + + # 2. Score + scored = [score_issue(i) for i in raw_issues] + result.scored = len(scored) + result.ready = sum(1 for s in scored if s.ready) + + # 3 & 4. Decide and execute for each issue + for issue in scored: + decision = decide(issue) + if decision.action == "skip": + result.decisions.append(decision) + continue + decision = await execute_decision(client, decision, dry_run=self._dry_run) + result.decisions.append(decision) + + # Rate-limit: short pause between API writes to avoid hammering Gitea + if not self._dry_run: + await asyncio.sleep(0.5) + + # 5. Daily summary (once per UTC day) + today = datetime.now(UTC).strftime("%Y-%m-%d") + if self._daily_summary and today != self._last_summary_date: + await post_daily_summary(client, result, scored, dry_run=self._dry_run) + self._last_summary_date = today + + result.duration_ms = int((time.monotonic() - start) * 1000) + self.history.append(result) + + assigned_count = sum(1 for d in result.decisions if d.executed and d.action != "skip") + logger.info( + "Triage cycle #%d complete (%d ms): %d open, %d ready, %d assigned", + self._cycle_count, + result.duration_ms, + result.total_open, + result.ready, + assigned_count, + ) + return result + + async def start(self) -> None: + """Start the triage loop as a background task.""" + if self._running: + logger.warning("BacklogTriageLoop already running") + return + self._running = True + await self._loop() + + async def _loop(self) -> None: + logger.info( + "BacklogTriageLoop started (interval=%.0fs, dry_run=%s)", + self._interval, + self._dry_run, + ) + while self._running: + try: + await self.run_once() + except Exception: + logger.exception("Backlog triage cycle failed") + await asyncio.sleep(self._interval) + + def stop(self) -> None: + """Signal the loop to stop after the current cycle.""" + self._running = False + logger.info("BacklogTriageLoop stop requested") diff --git a/tests/unit/test_backlog_triage.py b/tests/unit/test_backlog_triage.py new file mode 100644 index 00000000..e5d758e2 --- /dev/null +++ b/tests/unit/test_backlog_triage.py @@ -0,0 +1,621 @@ +"""Unit tests for timmy.backlog_triage — autonomous backlog triage loop.""" + +from datetime import UTC, datetime +from unittest.mock import AsyncMock, MagicMock, patch + +import pytest + +from timmy.backlog_triage import ( + AGENT_CLAUDE, + AGENT_KIMI, + KIMI_READY_LABEL, + OWNER_LOGIN, + READY_THRESHOLD, + BacklogTriageLoop, + ScoredIssue, + TriageCycleResult, + TriageDecision, + _build_audit_comment, + _build_daily_summary, + _extract_tags, + _score_acceptance, + _score_alignment, + _score_scope, + decide, + score_issue, +) + + +# ── Fixtures ───────────────────────────────────────────────────────────────── + + +def _make_raw_issue( + number: int = 1, + title: str = "Fix the login bug", + body: str = "## Problem\nLogin fails on empty password.\n\n## Steps\nassert response == 200", + labels: list | None = None, + assignees: list | None = None, + created_at: str = "2026-03-20T10:00:00Z", +) -> dict: + return { + "number": number, + "title": title, + "body": body, + "labels": [{"name": lbl} for lbl in (labels or [])], + "assignees": [{"login": a} for a in (assignees or [])], + "created_at": created_at, + } + + +def _make_scored_issue( + number: int = 1, + title: str = "Fix login bug", + issue_type: str = "bug", + score: int = 7, + ready: bool = True, + is_p0: bool = True, + is_blocked: bool = False, + assignees: list | None = None, + tags: set | None = None, + labels: list | None = None, + age_days: int = 3, +) -> ScoredIssue: + return ScoredIssue( + number=number, + title=title, + body="", + labels=labels or [], + tags=tags or {"bug"}, + assignees=assignees or [], + created_at=datetime.now(UTC), + issue_type=issue_type, + score=score, + scope=2, + acceptance=2, + alignment=3, + ready=ready, + age_days=age_days, + is_p0=is_p0, + is_blocked=is_blocked, + ) + + +# ── _extract_tags ───────────────────────────────────────────────────────────── + + +class TestExtractTags: + def test_bracket_tags_in_title(self): + tags = _extract_tags("[Bug] Login fails", []) + assert "bug" in tags + + def test_multiple_brackets(self): + tags = _extract_tags("[Bug][P0] Crash on startup", []) + assert "bug" in tags + assert "p0" in tags + + def test_label_names(self): + tags = _extract_tags("Fix thing", ["security", "hotfix"]) + assert "security" in tags + assert "hotfix" in tags + + def test_labels_lowercased(self): + tags = _extract_tags("Title", ["Bug", "FEATURE"]) + assert "bug" in tags + assert "feature" in tags + + def test_empty_inputs(self): + tags = _extract_tags("", []) + assert tags == set() + + +# ── Scoring functions ───────────────────────────────────────────────────────── + + +class TestScoreScope: + def test_file_reference_adds_point(self): + score = _score_scope("Fix auth", "Edit src/timmy/auth.py", set()) + assert score >= 1 + + def test_function_reference_adds_point(self): + score = _score_scope("Fix auth", "def validate_token()", set()) + assert score >= 1 + + def test_short_title_adds_point(self): + score = _score_scope("Short title", "", set()) + assert score >= 1 + + def test_meta_tag_penalizes(self): + score = _score_scope("Discussion about philosophy", "long body " * 5, {"philosophy"}) + assert score <= 1 + + def test_max_score_3(self): + score = _score_scope("Fix auth", "src/auth.py\ndef login()", set()) + assert score <= 3 + + +class TestScoreAcceptance: + def test_acceptance_keywords(self): + body = "should return 200\nmust pass tests\nexpect response" + score = _score_acceptance("Title", body, set()) + assert score >= 2 + + def test_test_reference_adds_point(self): + score = _score_acceptance("Title", "Run tox -e unit", set()) + assert score >= 1 + + def test_structured_sections(self): + body = "## Problem\nX\n## Solution\nY" + score = _score_acceptance("Title", body, set()) + assert score >= 1 + + def test_meta_tag_penalizes(self): + score = _score_acceptance("Title", "should do something", {"philosophy"}) + # still counts but penalized + assert score <= 2 + + def test_empty_body(self): + score = _score_acceptance("Title", "", set()) + assert score == 0 + + +class TestScoreAlignment: + def test_bug_tags_score_max(self): + assert _score_alignment("", "", {"bug"}) == 3 + + def test_hotfix_tag_max(self): + assert _score_alignment("", "", {"hotfix"}) == 3 + + def test_refactor_tag(self): + score = _score_alignment("", "", {"refactor"}) + assert score >= 2 + + def test_feature_tag(self): + score = _score_alignment("", "", {"feature"}) + assert score >= 2 + + def test_meta_tags_zero(self): + assert _score_alignment("", "", {"philosophy"}) == 0 + + def test_loop_generated_bonus(self): + score = _score_alignment("", "", {"loop-generated"}) + assert score >= 1 + + +# ── score_issue ─────────────────────────────────────────────────────────────── + + +class TestScoreIssue: + def test_bug_issue_classified_correctly(self): + raw = _make_raw_issue(labels=["bug"], title="[Bug] Crash on startup") + scored = score_issue(raw) + assert scored.issue_type == "bug" + assert scored.is_p0 is True + + def test_feature_issue_classified(self): + raw = _make_raw_issue(labels=["feature"], title="Add voice support") + scored = score_issue(raw) + assert scored.issue_type == "feature" + + def test_philosophy_issue_classified(self): + raw = _make_raw_issue(labels=["philosophy"], title="[Philosophy] Should Timmy sleep?") + scored = score_issue(raw) + assert scored.issue_type == "philosophy" + + def test_research_issue_classified(self): + raw = _make_raw_issue(labels=["research"], title="Investigate model options") + scored = score_issue(raw) + assert scored.issue_type == "research" + + def test_ready_flag_set_when_score_high(self): + body = ( + "## Problem\nX breaks.\n## Solution\nFix src/timmy/agent.py def run()\n" + "should return True\nmust pass tox -e unit" + ) + raw = _make_raw_issue(labels=["bug"], body=body) + scored = score_issue(raw) + assert scored.score >= READY_THRESHOLD + assert scored.ready is True + + def test_is_blocked_detected_in_body(self): + raw = _make_raw_issue(body="This is blocked by issue #50") + scored = score_issue(raw) + assert scored.is_blocked is True + + def test_is_blocked_detected_in_title(self): + raw = _make_raw_issue(title="[blocking] Cannot proceed") + scored = score_issue(raw) + # "blocking" in brackets becomes a tag + assert scored.is_blocked is True + + def test_unassigned_when_no_assignees(self): + raw = _make_raw_issue(assignees=[]) + scored = score_issue(raw) + assert scored.is_unassigned is True + + def test_assigned_when_has_assignee(self): + raw = _make_raw_issue(assignees=["claude"]) + scored = score_issue(raw) + assert scored.is_unassigned is False + + def test_age_days_computed(self): + old_ts = "2026-01-01T00:00:00Z" + raw = _make_raw_issue(created_at=old_ts) + scored = score_issue(raw) + assert scored.age_days > 0 + + def test_needs_kimi_for_research_label(self): + raw = _make_raw_issue(labels=["kimi-ready"]) + scored = score_issue(raw) + assert scored.needs_kimi is True + + +# ── decide ──────────────────────────────────────────────────────────────────── + + +class TestDecide: + def test_philosophy_skipped(self): + issue = _make_scored_issue(issue_type="philosophy", tags={"philosophy"}) + d = decide(issue) + assert d.action == "skip" + assert "philosophy" in d.reason.lower() + + def test_assigned_issue_skipped(self): + issue = _make_scored_issue(assignees=["perplexity"]) + d = decide(issue) + assert d.action == "skip" + assert "assigned" in d.reason.lower() + + def test_low_score_skipped(self): + issue = _make_scored_issue(score=2, ready=False) + d = decide(issue) + assert d.action == "skip" + assert "threshold" in d.reason.lower() + + def test_blocked_issue_flagged_for_alex(self): + issue = _make_scored_issue(is_blocked=True) + d = decide(issue) + assert d.action == "flag_alex" + assert d.agent == OWNER_LOGIN + + def test_research_issue_assigned_kimi(self): + issue = _make_scored_issue( + issue_type="research", + tags={"research"}, + is_p0=False, + is_blocked=False, + ) + d = decide(issue) + assert d.action == "assign_kimi" + assert d.agent == AGENT_KIMI + + def test_kimi_ready_label_assigns_kimi(self): + issue = _make_scored_issue( + issue_type="unknown", + tags={"kimi-ready"}, + labels=["kimi-ready"], + is_p0=False, + is_blocked=False, + ) + d = decide(issue) + assert d.action == "assign_kimi" + + def test_p0_bug_assigns_claude(self): + issue = _make_scored_issue(issue_type="bug", is_p0=True, is_blocked=False) + d = decide(issue) + assert d.action == "assign_claude" + assert d.agent == AGENT_CLAUDE + + def test_ready_feature_assigns_claude(self): + issue = _make_scored_issue( + issue_type="feature", + is_p0=False, + is_blocked=False, + tags={"feature"}, + ) + d = decide(issue) + assert d.action == "assign_claude" + assert d.agent == AGENT_CLAUDE + + def test_decision_has_reason(self): + issue = _make_scored_issue() + d = decide(issue) + assert len(d.reason) > 10 + + +# ── _build_audit_comment ────────────────────────────────────────────────────── + + +class TestBuildAuditComment: + def test_contains_timmy_triage_header(self): + d = TriageDecision(42, "assign_claude", "High priority bug", agent=AGENT_CLAUDE) + comment = _build_audit_comment(d) + assert "Timmy Triage" in comment + + def test_contains_issue_reason(self): + d = TriageDecision(42, "assign_claude", "Urgent P0 bug", agent=AGENT_CLAUDE) + comment = _build_audit_comment(d) + assert "Urgent P0 bug" in comment + + def test_assign_claude_mentions_agent(self): + d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE) + comment = _build_audit_comment(d) + assert AGENT_CLAUDE in comment + + def test_assign_kimi_mentions_label(self): + d = TriageDecision(42, "assign_kimi", "reason", agent=AGENT_KIMI) + comment = _build_audit_comment(d) + assert KIMI_READY_LABEL in comment + + def test_flag_alex_mentions_owner(self): + d = TriageDecision(42, "flag_alex", "blocked", agent=OWNER_LOGIN) + comment = _build_audit_comment(d) + assert OWNER_LOGIN in comment + + def test_contains_override_note(self): + d = TriageDecision(42, "assign_claude", "reason", agent=AGENT_CLAUDE) + comment = _build_audit_comment(d) + assert "override" in comment.lower() + + +# ── _build_daily_summary ────────────────────────────────────────────────────── + + +class TestBuildDailySummary: + def _make_result(self, decisions=None) -> TriageCycleResult: + return TriageCycleResult( + timestamp=datetime.now(UTC).isoformat(), + total_open=10, + scored=8, + ready=5, + decisions=decisions or [], + ) + + def test_contains_open_count(self): + result = self._make_result() + scored = [_make_scored_issue(number=i, ready=True, score=6) for i in range(1, 4)] + summary = _build_daily_summary(result, scored) + assert "10" in summary # total_open + + def test_contains_ready_count(self): + result = self._make_result() + summary = _build_daily_summary(result, []) + assert "5" in summary + + def test_actions_taken_section(self): + decisions = [ + TriageDecision(1, "assign_claude", "P0 bug", agent="claude", executed=True), + ] + result = self._make_result(decisions=decisions) + summary = _build_daily_summary(result, []) + assert "Actions Taken" in summary + assert "#1" in summary + + def test_top_issues_listed(self): + scored = [_make_scored_issue(number=99, ready=True, score=8)] + result = self._make_result() + summary = _build_daily_summary(result, scored) + assert "#99" in summary + + def test_footer_present(self): + summary = _build_daily_summary(self._make_result(), []) + assert "Auto-generated" in summary + + +# ── BacklogTriageLoop ───────────────────────────────────────────────────────── + + +class TestBacklogTriageLoop: + def test_default_interval_from_settings(self): + loop = BacklogTriageLoop() + from config import settings + + assert loop._interval == float(settings.backlog_triage_interval_seconds) + + def test_custom_interval(self): + loop = BacklogTriageLoop(interval=300) + assert loop._interval == 300.0 + + def test_dry_run_default(self): + loop = BacklogTriageLoop(dry_run=True) + assert loop._dry_run is True + + def test_not_running_initially(self): + loop = BacklogTriageLoop() + assert loop.is_running is False + + def test_stop_sets_running_false(self): + loop = BacklogTriageLoop() + loop._running = True + loop.stop() + assert loop._running is False + + def test_cycle_count_starts_zero(self): + loop = BacklogTriageLoop() + assert loop.cycle_count == 0 + + @pytest.mark.asyncio + async def test_run_once_skips_when_no_gitea_token(self): + loop = BacklogTriageLoop() + mock_settings = MagicMock() + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "" + mock_settings.backlog_triage_interval_seconds = 900 + mock_settings.backlog_triage_dry_run = False + mock_settings.backlog_triage_daily_summary = False + + with patch("timmy.backlog_triage.settings", mock_settings): + result = await loop.run_once() + + assert result.total_open == 0 + + @pytest.mark.asyncio + async def test_run_once_dry_run_no_api_writes(self): + """In dry_run mode, decisions are made but no Gitea API writes happen.""" + loop = BacklogTriageLoop(dry_run=True, daily_summary=False) + + raw_issues = [ + _make_raw_issue( + number=10, + title="Fix crash", + labels=["bug"], + body=( + "## Problem\nCrash on login.\n## Solution\nFix src/auth.py " + "def login()\nshould return 200\nmust pass tox tests" + ), + ) + ] + + mock_settings = MagicMock() + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "fake-token" + mock_settings.gitea_repo = "owner/repo" + mock_settings.gitea_url = "http://gitea.local" + mock_settings.backlog_triage_interval_seconds = 900 + mock_settings.backlog_triage_dry_run = True + mock_settings.backlog_triage_daily_summary = False + + mock_client = AsyncMock() + mock_client.get.return_value = MagicMock( + status_code=200, json=MagicMock(return_value=raw_issues) + ) + + mock_ctx = AsyncMock() + mock_ctx.__aenter__.return_value = mock_client + mock_ctx.__aexit__.return_value = False + + with ( + patch("timmy.backlog_triage.settings", mock_settings), + patch("httpx.AsyncClient", return_value=mock_ctx), + ): + result = await loop.run_once() + + # No POST/PATCH calls in dry run + mock_client.post.assert_not_called() + mock_client.patch.assert_not_called() + + assert result.total_open == 1 + assert loop.cycle_count == 1 + assert len(loop.history) == 1 + + @pytest.mark.asyncio + async def test_run_once_assigns_unassigned_bug(self): + """Unassigned ready bug should be assigned to Claude with audit comment.""" + loop = BacklogTriageLoop(dry_run=False, daily_summary=False) + + body = ( + "## Problem\nCrash on login.\n## Solution\nFix src/auth.py " + "def login()\nshould return 200\nmust pass tox tests" + ) + raw_issues = [_make_raw_issue(number=5, title="Fix crash", labels=["bug"], body=body)] + + mock_settings = MagicMock() + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "fake-token" + mock_settings.gitea_repo = "owner/repo" + mock_settings.gitea_url = "http://gitea.local" + mock_settings.backlog_triage_interval_seconds = 900 + mock_settings.backlog_triage_dry_run = False + mock_settings.backlog_triage_daily_summary = False + + # GET /issues returns our issue + get_issues_resp = MagicMock(status_code=200) + get_issues_resp.json.return_value = raw_issues + + # POST /comments returns success + comment_resp = MagicMock(status_code=201) + comment_resp.json.return_value = {"id": 1} + + # PATCH /issues/{n} (assign) returns success + assign_resp = MagicMock(status_code=200) + assign_resp.json.return_value = {"number": 5} + + mock_client = AsyncMock() + mock_client.get.return_value = get_issues_resp + mock_client.post.return_value = comment_resp + mock_client.patch.return_value = assign_resp + + mock_ctx = AsyncMock() + mock_ctx.__aenter__.return_value = mock_client + mock_ctx.__aexit__.return_value = False + + with ( + patch("timmy.backlog_triage.settings", mock_settings), + patch("httpx.AsyncClient", return_value=mock_ctx), + patch("asyncio.sleep", new_callable=AsyncMock), + ): + result = await loop.run_once() + + assert result.total_open == 1 + # Comment should have been posted + mock_client.post.assert_called() + # Assign should have been called (PATCH) + mock_client.patch.assert_called() + + @pytest.mark.asyncio + async def test_run_once_skips_already_assigned(self): + """Issues already assigned should not be acted upon.""" + loop = BacklogTriageLoop(dry_run=False, daily_summary=False) + + raw_issues = [ + _make_raw_issue( + number=3, + labels=["bug"], + assignees=["perplexity"], + body="## Problem\nX\nmust pass tox\nshould return 200 at least 3 times", + ) + ] + + mock_settings = MagicMock() + mock_settings.gitea_enabled = True + mock_settings.gitea_token = "tok" + mock_settings.gitea_repo = "owner/repo" + mock_settings.gitea_url = "http://gitea.local" + mock_settings.backlog_triage_interval_seconds = 900 + mock_settings.backlog_triage_dry_run = False + mock_settings.backlog_triage_daily_summary = False + + get_resp = MagicMock(status_code=200) + get_resp.json.return_value = raw_issues + + mock_client = AsyncMock() + mock_client.get.return_value = get_resp + + mock_ctx = AsyncMock() + mock_ctx.__aenter__.return_value = mock_client + mock_ctx.__aexit__.return_value = False + + with ( + patch("timmy.backlog_triage.settings", mock_settings), + patch("httpx.AsyncClient", return_value=mock_ctx), + ): + result = await loop.run_once() + + # No writes for already-assigned issue + mock_client.post.assert_not_called() + mock_client.patch.assert_not_called() + assert result.decisions[0].action == "skip" + + +# ── ScoredIssue properties ──────────────────────────────────────────────────── + + +class TestScoredIssueProperties: + def test_is_unassigned_true_when_no_assignees(self): + issue = _make_scored_issue(assignees=[]) + assert issue.is_unassigned is True + + def test_is_unassigned_false_when_assigned(self): + issue = _make_scored_issue(assignees=["claude"]) + assert issue.is_unassigned is False + + def test_needs_kimi_for_research_tag(self): + issue = _make_scored_issue(tags={"research"}) + assert issue.needs_kimi is True + + def test_needs_kimi_for_kimi_ready_label(self): + issue = _make_scored_issue(labels=["kimi-ready"], tags=set()) + assert issue.needs_kimi is True + + def test_needs_kimi_false_for_bug(self): + issue = _make_scored_issue(tags={"bug"}, labels=[]) + assert issue.needs_kimi is False