[claude] Build Timmy autonomous backlog triage loop (#1071) (#1141)

This commit is contained in:
2026-03-23 18:32:27 +00:00
parent 0c627f175b
commit 4f8e86348c
3 changed files with 1390 additions and 0 deletions

View File

@@ -304,6 +304,16 @@ class Settings(BaseSettings):
mcp_timeout: int = 15
mcp_bridge_timeout: int = 60 # HTTP timeout for MCP bridge Ollama calls (seconds)
# ── Backlog Triage Loop ────────────────────────────────────────────
# Autonomous loop: fetch open issues, score, assign to agents.
backlog_triage_enabled: bool = False
# Seconds between triage cycles (default: 15 minutes).
backlog_triage_interval_seconds: int = 900
# When True, score and summarize but don't write to Gitea.
backlog_triage_dry_run: bool = False
# Create a daily triage summary issue/comment.
backlog_triage_daily_summary: bool = True
# ── Loop QA (Self-Testing) ─────────────────────────────────────────
# Self-test orchestrator that probes capabilities alongside the thinking loop.
loop_qa_enabled: bool = True

759
src/timmy/backlog_triage.py Normal file
View File

@@ -0,0 +1,759 @@
"""Autonomous backlog triage loop — Timmy scans Gitea and assigns work.
Continuously fetches open issues, scores/prioritizes them, and decides
what to work on next without waiting to be asked.
Loop flow::
while true:
1. Fetch all open issues from Gitea API
2. Score/prioritize by labels, age, type, blocked status
3. Identify unassigned high-priority items
4. Decide: assign to claude, dispatch to kimi, or flag for Alex
5. Execute the assignment (comment + assign)
6. Optionally post a daily triage summary
7. Sleep for configurable interval (default 15 min)
Priority tiers:
P0 — security, data loss, blocking bugs → immediate action
P1 — core functionality, ready issues → next sprint
P2 — improvements, low-score issues → backlog
P3 — philosophy, meta → someday/never (skip in triage)
Usage::
from timmy.backlog_triage import BacklogTriageLoop
loop = BacklogTriageLoop()
await loop.run_once() # single triage cycle
await loop.start() # background daemon loop
loop.stop() # graceful shutdown
"""
from __future__ import annotations
import asyncio
import logging
import re
from dataclasses import dataclass, field
from datetime import UTC, datetime, timedelta
from typing import Any
import httpx
from config import settings
logger = logging.getLogger(__name__)
# ── Constants ────────────────────────────────────────────────────────────────
# Minimum triage score to be considered "ready" for assignment
READY_THRESHOLD = 5
# Agent Gitea logins
AGENT_CLAUDE = "claude"
AGENT_KIMI = "kimi"
OWNER_LOGIN = "rockachopa" # Alex — human owner
# Labels
KIMI_READY_LABEL = "kimi-ready"
TRIAGE_DONE_LABEL = "triage-done"
# Tag sets (mirrors scripts/triage_score.py)
_BUG_TAGS = frozenset({"bug", "broken", "crash", "error", "fix", "regression", "hotfix"})
_FEATURE_TAGS = frozenset({"feature", "feat", "enhancement", "capability", "timmy-capability"})
_REFACTOR_TAGS = frozenset({"refactor", "cleanup", "tech-debt", "optimization", "perf"})
_META_TAGS = frozenset({"philosophy", "soul-gap", "discussion", "question", "rfc"})
_P0_TAGS = frozenset({"security", "data-loss", "blocking", "p0", "critical"})
_RESEARCH_TAGS = frozenset({"research", "kimi-ready", "investigation", "spike"})
_LOOP_TAG = "loop-generated"
# Regex patterns for scoring
_TAG_RE = re.compile(r"\[([^\]]+)\]")
_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE)
_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE)
_ACCEPT_RE = re.compile(
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
re.IGNORECASE,
)
_TEST_RE = re.compile(r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE)
_BLOCKED_RE = re.compile(r"\bblock(?:ed|s|ing)\b", re.IGNORECASE)
# ── Data types ───────────────────────────────────────────────────────────────
@dataclass
class ScoredIssue:
"""A Gitea issue enriched with triage scoring."""
number: int
title: str
body: str
labels: list[str]
tags: set[str]
assignees: list[str]
created_at: datetime
issue_type: str # bug | feature | refactor | philosophy | research | unknown
score: int = 0
scope: int = 0
acceptance: int = 0
alignment: int = 0
ready: bool = False
age_days: int = 0
is_p0: bool = False
is_blocked: bool = False
@property
def is_unassigned(self) -> bool:
return len(self.assignees) == 0
@property
def needs_kimi(self) -> bool:
return bool(self.tags & _RESEARCH_TAGS) or KIMI_READY_LABEL in self.labels
@dataclass
class TriageDecision:
"""The outcome of a triage decision for a single issue."""
issue_number: int
action: str # "assign_claude" | "assign_kimi" | "flag_alex" | "skip"
reason: str
agent: str = "" # the agent assigned (login)
executed: bool = False
error: str = ""
@dataclass
class TriageCycleResult:
"""Summary of one complete triage cycle."""
timestamp: str
total_open: int
scored: int
ready: int
decisions: list[TriageDecision] = field(default_factory=list)
errors: list[str] = field(default_factory=list)
duration_ms: int = 0
# ── Scoring ──────────────────────────────────────────────────────────────────
def _extract_tags(title: str, labels: list[str]) -> set[str]:
"""Pull tags from [bracket] title notation + Gitea label names."""
tags: set[str] = set()
for m in _TAG_RE.finditer(title):
tags.add(m.group(1).lower().strip())
for lbl in labels:
tags.add(lbl.lower().strip())
return tags
def _score_scope(title: str, body: str, tags: set[str]) -> int:
"""03: How well-scoped is this issue?"""
text = f"{title}\n{body}"
score = 0
if _FILE_RE.search(text):
score += 1
if _FUNC_RE.search(text):
score += 1
clean = _TAG_RE.sub("", title).strip()
if len(clean) < 80:
score += 1
if tags & _META_TAGS:
score = max(0, score - 2)
return min(3, score)
def _score_acceptance(title: str, body: str, tags: set[str]) -> int:
"""03: Does this have clear acceptance criteria?"""
text = f"{title}\n{body}"
score = 0
matches = len(_ACCEPT_RE.findall(text))
if matches >= 3:
score += 2
elif matches >= 1:
score += 1
if _TEST_RE.search(text):
score += 1
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
score += 1
if tags & _META_TAGS:
score = max(0, score - 1)
return min(3, score)
def _score_alignment(title: str, body: str, tags: set[str]) -> int:
"""03: How aligned is this with the north star?"""
score = 0
if tags & _BUG_TAGS:
return 3
if tags & _REFACTOR_TAGS:
score += 2
if tags & _FEATURE_TAGS:
score += 2
if _LOOP_TAG in tags:
score += 1
if tags & _META_TAGS:
score = 0
return min(3, score)
def score_issue(issue: dict[str, Any]) -> ScoredIssue:
"""Score and classify a raw Gitea issue dict."""
number = issue["number"]
title = issue.get("title", "")
body = issue.get("body") or ""
label_names = [lbl["name"] for lbl in issue.get("labels", [])]
tags = _extract_tags(title, label_names)
assignees = [a["login"] for a in issue.get("assignees", [])]
# Parse created_at
raw_ts = issue.get("created_at", "")
try:
created_at = datetime.fromisoformat(raw_ts.replace("Z", "+00:00"))
except (ValueError, AttributeError):
created_at = datetime.now(UTC)
age_days = (datetime.now(UTC) - created_at).days
# Scores
scope = _score_scope(title, body, tags)
acceptance = _score_acceptance(title, body, tags)
alignment = _score_alignment(title, body, tags)
total = scope + acceptance + alignment
# Classify
if tags & _BUG_TAGS:
issue_type = "bug"
elif tags & _RESEARCH_TAGS:
issue_type = "research"
elif tags & _FEATURE_TAGS:
issue_type = "feature"
elif tags & _REFACTOR_TAGS:
issue_type = "refactor"
elif tags & _META_TAGS:
issue_type = "philosophy"
else:
issue_type = "unknown"
is_p0 = bool(tags & _P0_TAGS) or issue_type == "bug"
is_blocked = bool(_BLOCKED_RE.search(title) or _BLOCKED_RE.search(body))
return ScoredIssue(
number=number,
title=_TAG_RE.sub("", title).strip(),
body=body,
labels=label_names,
tags=tags,
assignees=assignees,
created_at=created_at,
issue_type=issue_type,
score=total,
scope=scope,
acceptance=acceptance,
alignment=alignment,
ready=total >= READY_THRESHOLD,
age_days=age_days,
is_p0=is_p0,
is_blocked=is_blocked,
)
# ── Decision logic ───────────────────────────────────────────────────────────
def decide(issue: ScoredIssue) -> TriageDecision:
"""Decide what to do with an issue.
Returns a TriageDecision with action, reason, and agent.
Decision is not yet executed — call execute_decision() for that.
"""
num = issue.number
# Skip philosophy/meta — not dev-actionable
if issue.issue_type == "philosophy":
return TriageDecision(
issue_number=num,
action="skip",
reason="Philosophy/meta issue — not dev-actionable in the triage loop.",
)
# Skip already-assigned issues
if not issue.is_unassigned:
return TriageDecision(
issue_number=num,
action="skip",
reason=f"Already assigned to: {', '.join(issue.assignees)}.",
)
# Skip if not ready (low score)
if not issue.ready:
return TriageDecision(
issue_number=num,
action="skip",
reason=f"Score {issue.score} < {READY_THRESHOLD} threshold — needs more detail before assignment.",
)
# Blocked: flag for Alex
if issue.is_blocked:
return TriageDecision(
issue_number=num,
action="flag_alex",
agent=OWNER_LOGIN,
reason=(
"Issue appears blocked. Flagging for @rockachopa to unblock before autonomous assignment."
),
)
# Research / Kimi-ready
if issue.needs_kimi:
return TriageDecision(
issue_number=num,
action="assign_kimi",
agent=AGENT_KIMI,
reason=(
f"Issue type '{issue.issue_type}' with research/investigation scope. "
f"Assigning kimi-ready label for Kimi agent to pick up."
),
)
# P0 bugs and blocking issues → Claude immediately
if issue.is_p0:
return TriageDecision(
issue_number=num,
action="assign_claude",
agent=AGENT_CLAUDE,
reason=(
f"P0/{issue.issue_type} issue (score={issue.score}, age={issue.age_days}d). "
f"Assigning to Claude Code for immediate attention."
),
)
# Everything else that is ready → Claude Code
return TriageDecision(
issue_number=num,
action="assign_claude",
agent=AGENT_CLAUDE,
reason=(
f"Unassigned ready issue (type={issue.issue_type}, score={issue.score}, "
f"age={issue.age_days}d). Assigning to Claude Code."
),
)
# ── Gitea API client ─────────────────────────────────────────────────────────
def _api_headers() -> dict[str, str]:
return {
"Authorization": f"token {settings.gitea_token}",
"Content-Type": "application/json",
"Accept": "application/json",
}
def _repo_url(path: str) -> str:
owner, repo = settings.gitea_repo.split("/", 1)
return f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/{path}"
async def fetch_open_issues(client: httpx.AsyncClient) -> list[dict[str, Any]]:
"""Fetch all open issues from Gitea, paginating as needed."""
all_issues: list[dict[str, Any]] = []
page = 1
while True:
url = _repo_url(f"issues?state=open&type=issues&limit=50&page={page}")
try:
resp = await client.get(url, headers=_api_headers())
if resp.status_code != 200:
logger.warning("Gitea issues fetch failed (HTTP %s)", resp.status_code)
break
batch: list[dict[str, Any]] = resp.json()
if not batch:
break
all_issues.extend(batch)
if len(batch) < 50:
break
page += 1
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Gitea connection error fetching issues: %s", exc)
break
return all_issues
async def post_comment(
client: httpx.AsyncClient,
issue_number: int,
body: str,
) -> bool:
"""Post a comment on a Gitea issue. Returns True on success."""
url = _repo_url(f"issues/{issue_number}/comments")
try:
resp = await client.post(url, headers=_api_headers(), json={"body": body})
return resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to post comment on #%d: %s", issue_number, exc)
return False
async def assign_issue(
client: httpx.AsyncClient,
issue_number: int,
assignee: str,
) -> bool:
"""Assign an issue to a Gitea user. Returns True on success."""
url = _repo_url(f"issues/{issue_number}")
try:
resp = await client.patch(
url,
headers=_api_headers(),
json={"assignees": [assignee]},
)
return resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to assign #%d to %s: %s", issue_number, assignee, exc)
return False
async def add_label(
client: httpx.AsyncClient,
issue_number: int,
label_name: str,
) -> bool:
"""Add a label to a Gitea issue by name (auto-creates if missing). Returns True on success."""
owner, repo = settings.gitea_repo.split("/", 1)
labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels"
headers = _api_headers()
try:
# Fetch existing labels
resp = await client.get(labels_url, headers=headers)
if resp.status_code != 200:
return False
existing = {lbl["name"]: lbl["id"] for lbl in resp.json()}
if label_name in existing:
label_id = existing[label_name]
else:
# Auto-create the label
create_resp = await client.post(
labels_url,
headers=headers,
json={"name": label_name, "color": "#006b75"},
)
if create_resp.status_code not in (200, 201):
return False
label_id = create_resp.json()["id"]
# Apply to the issue
apply_url = _repo_url(f"issues/{issue_number}/labels")
apply_resp = await client.post(
apply_url, headers=headers, json={"labels": [label_id]}
)
return apply_resp.status_code in (200, 201)
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to add label %r to #%d: %s", label_name, issue_number, exc)
return False
# ── Decision execution ───────────────────────────────────────────────────────
async def execute_decision(
client: httpx.AsyncClient,
decision: TriageDecision,
dry_run: bool = False,
) -> TriageDecision:
"""Execute a triage decision — comment + assign/label.
When dry_run=True, logs the decision but makes no Gitea API calls.
Returns the updated decision with executed=True on success.
"""
num = decision.issue_number
if decision.action == "skip":
logger.debug("Triage skip #%d: %s", num, decision.reason)
decision.executed = True
return decision
audit_comment = _build_audit_comment(decision)
if dry_run:
logger.info(
"[DRY RUN] #%d%s (%s): %s",
num,
decision.action,
decision.agent,
decision.reason,
)
decision.executed = True
return decision
# Post audit comment first (always, so Alex can see reasoning)
comment_ok = await post_comment(client, num, audit_comment)
if not comment_ok:
decision.error = "Failed to post audit comment"
logger.warning("Triage #%d: comment failed", num)
return decision
# Execute assignment
ok = False
if decision.action == "assign_claude":
ok = await assign_issue(client, num, AGENT_CLAUDE)
elif decision.action == "assign_kimi":
ok = await add_label(client, num, KIMI_READY_LABEL)
elif decision.action == "flag_alex":
# Comment already posted above — that's sufficient for flagging
ok = True
if ok:
decision.executed = True
logger.info("Triage #%d%s OK", num, decision.action)
else:
decision.error = f"Action {decision.action!r} failed"
logger.warning("Triage #%d: action %r failed", num, decision.action)
return decision
def _build_audit_comment(decision: TriageDecision) -> str:
"""Build the audit trail comment that Alex can read to see reasoning."""
ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
action_text = {
"assign_claude": f"Assigning to @{AGENT_CLAUDE} for implementation.",
"assign_kimi": f"Adding `{KIMI_READY_LABEL}` label — queuing for Kimi research agent.",
"flag_alex": f"Flagging for @{OWNER_LOGIN} — issue appears blocked or needs human decision.",
}.get(decision.action, decision.action)
return (
f"**[Timmy Triage — {ts}]**\n\n"
f"**Decision:** {action_text}\n\n"
f"**Why:** {decision.reason}\n\n"
f"*Autonomous triage by Timmy. Reply to override.*"
)
# ── Daily summary ─────────────────────────────────────────────────────────────
def _build_daily_summary(result: TriageCycleResult, scored: list[ScoredIssue]) -> str:
"""Build the daily triage summary body."""
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
assigned = [d for d in result.decisions if d.executed and d.action != "skip"]
skipped = [d for d in result.decisions if d.action == "skip"]
lines = [
f"# Timmy Backlog Triage — {now}",
"",
f"**Open issues:** {result.total_open} | "
f"**Scored:** {result.scored} | "
f"**Ready:** {result.ready} | "
f"**Assigned this cycle:** {len(assigned)}",
"",
"## Top 10 Ready Issues (by score)",
"",
]
top = sorted([s for s in scored if s.ready], key=lambda s: (-s.score, s.number))[:10]
for s in top:
flag = "🐛" if s.issue_type == "bug" else "" if s.is_p0 else ""
lines.append(
f"- {flag} **#{s.number}** (score={s.score}, age={s.age_days}d) — {s.title[:80]}"
)
if assigned:
lines += ["", "## Actions Taken", ""]
for d in assigned:
lines.append(f"- #{d.issue_number} → `{d.action}` ({d.agent}): {d.reason[:100]}")
if skipped:
lines += ["", f"## Skipped ({len(skipped)} issues)", ""]
for d in skipped[:5]:
lines.append(f"- #{d.issue_number}: {d.reason[:80]}")
if len(skipped) > 5:
lines.append(f"- … and {len(skipped) - 5} more")
lines += [
"",
"---",
"*Auto-generated by Timmy's backlog triage loop. "
"Override any decision by reassigning or commenting.*",
]
return "\n".join(lines)
async def post_daily_summary(
client: httpx.AsyncClient,
result: TriageCycleResult,
scored: list[ScoredIssue],
dry_run: bool = False,
) -> bool:
"""Post a daily triage summary as a new Gitea issue."""
today = datetime.now(UTC).strftime("%Y-%m-%d")
title = f"[Triage] Daily backlog summary — {today}"
body = _build_daily_summary(result, scored)
if dry_run:
logger.info("[DRY RUN] Would post daily summary: %s", title)
return True
url = _repo_url("issues")
try:
resp = await client.post(
url,
headers=_api_headers(),
json={
"title": title,
"body": body,
"labels": [],
},
)
if resp.status_code in (200, 201):
issue_num = resp.json().get("number", "?")
logger.info("Daily triage summary posted as issue #%s", issue_num)
return True
logger.warning("Daily summary post failed (HTTP %s)", resp.status_code)
return False
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
logger.warning("Failed to post daily summary: %s", exc)
return False
# ── Main loop class ───────────────────────────────────────────────────────────
class BacklogTriageLoop:
"""Autonomous backlog triage loop.
Fetches, scores, and assigns Gitea issues on a configurable interval.
Parameters
----------
interval:
Seconds between triage cycles. Default: settings.backlog_triage_interval_seconds.
dry_run:
When True, score and log decisions but don't write to Gitea.
daily_summary:
When True, post a daily triage summary issue after each cycle.
"""
def __init__(
self,
*,
interval: float | None = None,
dry_run: bool | None = None,
daily_summary: bool | None = None,
) -> None:
self._interval = float(interval or settings.backlog_triage_interval_seconds)
self._dry_run = dry_run if dry_run is not None else settings.backlog_triage_dry_run
self._daily_summary = (
daily_summary if daily_summary is not None else settings.backlog_triage_daily_summary
)
self._running = False
self._task: asyncio.Task | None = None
self._cycle_count = 0
self._last_summary_date: str = ""
self.history: list[TriageCycleResult] = []
@property
def is_running(self) -> bool:
return self._running
@property
def cycle_count(self) -> int:
return self._cycle_count
async def run_once(self) -> TriageCycleResult:
"""Execute one full triage cycle.
1. Fetch all open Gitea issues
2. Score and prioritize
3. Decide on each unassigned ready issue
4. Execute decisions
5. Optionally post daily summary
"""
import time
self._cycle_count += 1
start = time.monotonic()
ts = datetime.now(UTC).isoformat()
result = TriageCycleResult(timestamp=ts, total_open=0, scored=0, ready=0)
if not settings.gitea_enabled or not settings.gitea_token:
logger.warning("Backlog triage: Gitea not configured — skipping cycle")
return result
async with httpx.AsyncClient(timeout=30) as client:
# 1. Fetch
raw_issues = await fetch_open_issues(client)
result.total_open = len(raw_issues)
logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues))
# 2. Score
scored = [score_issue(i) for i in raw_issues]
result.scored = len(scored)
result.ready = sum(1 for s in scored if s.ready)
# 3 & 4. Decide and execute for each issue
for issue in scored:
decision = decide(issue)
if decision.action == "skip":
result.decisions.append(decision)
continue
decision = await execute_decision(client, decision, dry_run=self._dry_run)
result.decisions.append(decision)
# Rate-limit: short pause between API writes to avoid hammering Gitea
if not self._dry_run:
await asyncio.sleep(0.5)
# 5. Daily summary (once per UTC day)
today = datetime.now(UTC).strftime("%Y-%m-%d")
if self._daily_summary and today != self._last_summary_date:
await post_daily_summary(client, result, scored, dry_run=self._dry_run)
self._last_summary_date = today
result.duration_ms = int((time.monotonic() - start) * 1000)
self.history.append(result)
assigned_count = sum(1 for d in result.decisions if d.executed and d.action != "skip")
logger.info(
"Triage cycle #%d complete (%d ms): %d open, %d ready, %d assigned",
self._cycle_count,
result.duration_ms,
result.total_open,
result.ready,
assigned_count,
)
return result
async def start(self) -> None:
"""Start the triage loop as a background task."""
if self._running:
logger.warning("BacklogTriageLoop already running")
return
self._running = True
await self._loop()
async def _loop(self) -> None:
logger.info(
"BacklogTriageLoop started (interval=%.0fs, dry_run=%s)",
self._interval,
self._dry_run,
)
while self._running:
try:
await self.run_once()
except Exception:
logger.exception("Backlog triage cycle failed")
await asyncio.sleep(self._interval)
def stop(self) -> None:
"""Signal the loop to stop after the current cycle."""
self._running = False
logger.info("BacklogTriageLoop stop requested")