forked from Rockachopa/Timmy-time-dashboard
760 lines
25 KiB
Python
760 lines
25 KiB
Python
"""Autonomous backlog triage loop — Timmy scans Gitea and assigns work.
|
||
|
||
Continuously fetches open issues, scores/prioritizes them, and decides
|
||
what to work on next without waiting to be asked.
|
||
|
||
Loop flow::
|
||
|
||
while true:
|
||
1. Fetch all open issues from Gitea API
|
||
2. Score/prioritize by labels, age, type, blocked status
|
||
3. Identify unassigned high-priority items
|
||
4. Decide: assign to claude, dispatch to kimi, or flag for Alex
|
||
5. Execute the assignment (comment + assign)
|
||
6. Optionally post a daily triage summary
|
||
7. Sleep for configurable interval (default 15 min)
|
||
|
||
Priority tiers:
|
||
P0 — security, data loss, blocking bugs → immediate action
|
||
P1 — core functionality, ready issues → next sprint
|
||
P2 — improvements, low-score issues → backlog
|
||
P3 — philosophy, meta → someday/never (skip in triage)
|
||
|
||
Usage::
|
||
|
||
from timmy.backlog_triage import BacklogTriageLoop
|
||
|
||
loop = BacklogTriageLoop()
|
||
await loop.run_once() # single triage cycle
|
||
await loop.start() # background daemon loop
|
||
loop.stop() # graceful shutdown
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import asyncio
|
||
import logging
|
||
import re
|
||
from dataclasses import dataclass, field
|
||
from datetime import UTC, datetime, timedelta
|
||
from typing import Any
|
||
|
||
import httpx
|
||
|
||
from config import settings
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# ── Constants ────────────────────────────────────────────────────────────────
|
||
|
||
# Minimum triage score to be considered "ready" for assignment
|
||
READY_THRESHOLD = 5
|
||
|
||
# Agent Gitea logins
|
||
AGENT_CLAUDE = "claude"
|
||
AGENT_KIMI = "kimi"
|
||
OWNER_LOGIN = "rockachopa" # Alex — human owner
|
||
|
||
# Labels
|
||
KIMI_READY_LABEL = "kimi-ready"
|
||
TRIAGE_DONE_LABEL = "triage-done"
|
||
|
||
# Tag sets (mirrors scripts/triage_score.py)
|
||
_BUG_TAGS = frozenset({"bug", "broken", "crash", "error", "fix", "regression", "hotfix"})
|
||
_FEATURE_TAGS = frozenset({"feature", "feat", "enhancement", "capability", "timmy-capability"})
|
||
_REFACTOR_TAGS = frozenset({"refactor", "cleanup", "tech-debt", "optimization", "perf"})
|
||
_META_TAGS = frozenset({"philosophy", "soul-gap", "discussion", "question", "rfc"})
|
||
_P0_TAGS = frozenset({"security", "data-loss", "blocking", "p0", "critical"})
|
||
_RESEARCH_TAGS = frozenset({"research", "kimi-ready", "investigation", "spike"})
|
||
_LOOP_TAG = "loop-generated"
|
||
|
||
# Regex patterns for scoring
|
||
_TAG_RE = re.compile(r"\[([^\]]+)\]")
|
||
_FILE_RE = re.compile(r"(?:src/|tests/|scripts/|\.py|\.html|\.js|\.yaml|\.toml|\.sh)", re.IGNORECASE)
|
||
_FUNC_RE = re.compile(r"(?:def |class |function |method |`\w+\(\)`)", re.IGNORECASE)
|
||
_ACCEPT_RE = re.compile(
|
||
r"(?:should|must|expect|verify|assert|test.?case|acceptance|criteria"
|
||
r"|pass(?:es|ing)|fail(?:s|ing)|return(?:s)?|raise(?:s)?)",
|
||
re.IGNORECASE,
|
||
)
|
||
_TEST_RE = re.compile(r"(?:tox|pytest|test_\w+|\.test\.|assert\s)", re.IGNORECASE)
|
||
_BLOCKED_RE = re.compile(r"\bblock(?:ed|s|ing)\b", re.IGNORECASE)
|
||
|
||
|
||
# ── Data types ───────────────────────────────────────────────────────────────
|
||
|
||
|
||
@dataclass
|
||
class ScoredIssue:
|
||
"""A Gitea issue enriched with triage scoring."""
|
||
|
||
number: int
|
||
title: str
|
||
body: str
|
||
labels: list[str]
|
||
tags: set[str]
|
||
assignees: list[str]
|
||
created_at: datetime
|
||
issue_type: str # bug | feature | refactor | philosophy | research | unknown
|
||
|
||
score: int = 0
|
||
scope: int = 0
|
||
acceptance: int = 0
|
||
alignment: int = 0
|
||
ready: bool = False
|
||
age_days: int = 0
|
||
is_p0: bool = False
|
||
is_blocked: bool = False
|
||
|
||
@property
|
||
def is_unassigned(self) -> bool:
|
||
return len(self.assignees) == 0
|
||
|
||
@property
|
||
def needs_kimi(self) -> bool:
|
||
return bool(self.tags & _RESEARCH_TAGS) or KIMI_READY_LABEL in self.labels
|
||
|
||
|
||
@dataclass
|
||
class TriageDecision:
|
||
"""The outcome of a triage decision for a single issue."""
|
||
|
||
issue_number: int
|
||
action: str # "assign_claude" | "assign_kimi" | "flag_alex" | "skip"
|
||
reason: str
|
||
agent: str = "" # the agent assigned (login)
|
||
executed: bool = False
|
||
error: str = ""
|
||
|
||
|
||
@dataclass
|
||
class TriageCycleResult:
|
||
"""Summary of one complete triage cycle."""
|
||
|
||
timestamp: str
|
||
total_open: int
|
||
scored: int
|
||
ready: int
|
||
decisions: list[TriageDecision] = field(default_factory=list)
|
||
errors: list[str] = field(default_factory=list)
|
||
duration_ms: int = 0
|
||
|
||
|
||
# ── Scoring ──────────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _extract_tags(title: str, labels: list[str]) -> set[str]:
|
||
"""Pull tags from [bracket] title notation + Gitea label names."""
|
||
tags: set[str] = set()
|
||
for m in _TAG_RE.finditer(title):
|
||
tags.add(m.group(1).lower().strip())
|
||
for lbl in labels:
|
||
tags.add(lbl.lower().strip())
|
||
return tags
|
||
|
||
|
||
def _score_scope(title: str, body: str, tags: set[str]) -> int:
|
||
"""0–3: How well-scoped is this issue?"""
|
||
text = f"{title}\n{body}"
|
||
score = 0
|
||
if _FILE_RE.search(text):
|
||
score += 1
|
||
if _FUNC_RE.search(text):
|
||
score += 1
|
||
clean = _TAG_RE.sub("", title).strip()
|
||
if len(clean) < 80:
|
||
score += 1
|
||
if tags & _META_TAGS:
|
||
score = max(0, score - 2)
|
||
return min(3, score)
|
||
|
||
|
||
def _score_acceptance(title: str, body: str, tags: set[str]) -> int:
|
||
"""0–3: Does this have clear acceptance criteria?"""
|
||
text = f"{title}\n{body}"
|
||
score = 0
|
||
matches = len(_ACCEPT_RE.findall(text))
|
||
if matches >= 3:
|
||
score += 2
|
||
elif matches >= 1:
|
||
score += 1
|
||
if _TEST_RE.search(text):
|
||
score += 1
|
||
if re.search(r"##\s*(problem|solution|expected|actual|steps)", body, re.IGNORECASE):
|
||
score += 1
|
||
if tags & _META_TAGS:
|
||
score = max(0, score - 1)
|
||
return min(3, score)
|
||
|
||
|
||
def _score_alignment(title: str, body: str, tags: set[str]) -> int:
|
||
"""0–3: How aligned is this with the north star?"""
|
||
score = 0
|
||
if tags & _BUG_TAGS:
|
||
return 3
|
||
if tags & _REFACTOR_TAGS:
|
||
score += 2
|
||
if tags & _FEATURE_TAGS:
|
||
score += 2
|
||
if _LOOP_TAG in tags:
|
||
score += 1
|
||
if tags & _META_TAGS:
|
||
score = 0
|
||
return min(3, score)
|
||
|
||
|
||
def score_issue(issue: dict[str, Any]) -> ScoredIssue:
|
||
"""Score and classify a raw Gitea issue dict."""
|
||
number = issue["number"]
|
||
title = issue.get("title", "")
|
||
body = issue.get("body") or ""
|
||
label_names = [lbl["name"] for lbl in issue.get("labels", [])]
|
||
tags = _extract_tags(title, label_names)
|
||
assignees = [a["login"] for a in issue.get("assignees", [])]
|
||
|
||
# Parse created_at
|
||
raw_ts = issue.get("created_at", "")
|
||
try:
|
||
created_at = datetime.fromisoformat(raw_ts.replace("Z", "+00:00"))
|
||
except (ValueError, AttributeError):
|
||
created_at = datetime.now(UTC)
|
||
age_days = (datetime.now(UTC) - created_at).days
|
||
|
||
# Scores
|
||
scope = _score_scope(title, body, tags)
|
||
acceptance = _score_acceptance(title, body, tags)
|
||
alignment = _score_alignment(title, body, tags)
|
||
total = scope + acceptance + alignment
|
||
|
||
# Classify
|
||
if tags & _BUG_TAGS:
|
||
issue_type = "bug"
|
||
elif tags & _RESEARCH_TAGS:
|
||
issue_type = "research"
|
||
elif tags & _FEATURE_TAGS:
|
||
issue_type = "feature"
|
||
elif tags & _REFACTOR_TAGS:
|
||
issue_type = "refactor"
|
||
elif tags & _META_TAGS:
|
||
issue_type = "philosophy"
|
||
else:
|
||
issue_type = "unknown"
|
||
|
||
is_p0 = bool(tags & _P0_TAGS) or issue_type == "bug"
|
||
is_blocked = bool(_BLOCKED_RE.search(title) or _BLOCKED_RE.search(body))
|
||
|
||
return ScoredIssue(
|
||
number=number,
|
||
title=_TAG_RE.sub("", title).strip(),
|
||
body=body,
|
||
labels=label_names,
|
||
tags=tags,
|
||
assignees=assignees,
|
||
created_at=created_at,
|
||
issue_type=issue_type,
|
||
score=total,
|
||
scope=scope,
|
||
acceptance=acceptance,
|
||
alignment=alignment,
|
||
ready=total >= READY_THRESHOLD,
|
||
age_days=age_days,
|
||
is_p0=is_p0,
|
||
is_blocked=is_blocked,
|
||
)
|
||
|
||
|
||
# ── Decision logic ───────────────────────────────────────────────────────────
|
||
|
||
|
||
def decide(issue: ScoredIssue) -> TriageDecision:
|
||
"""Decide what to do with an issue.
|
||
|
||
Returns a TriageDecision with action, reason, and agent.
|
||
Decision is not yet executed — call execute_decision() for that.
|
||
"""
|
||
num = issue.number
|
||
|
||
# Skip philosophy/meta — not dev-actionable
|
||
if issue.issue_type == "philosophy":
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="skip",
|
||
reason="Philosophy/meta issue — not dev-actionable in the triage loop.",
|
||
)
|
||
|
||
# Skip already-assigned issues
|
||
if not issue.is_unassigned:
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="skip",
|
||
reason=f"Already assigned to: {', '.join(issue.assignees)}.",
|
||
)
|
||
|
||
# Skip if not ready (low score)
|
||
if not issue.ready:
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="skip",
|
||
reason=f"Score {issue.score} < {READY_THRESHOLD} threshold — needs more detail before assignment.",
|
||
)
|
||
|
||
# Blocked: flag for Alex
|
||
if issue.is_blocked:
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="flag_alex",
|
||
agent=OWNER_LOGIN,
|
||
reason=(
|
||
"Issue appears blocked. Flagging for @rockachopa to unblock before autonomous assignment."
|
||
),
|
||
)
|
||
|
||
# Research / Kimi-ready
|
||
if issue.needs_kimi:
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="assign_kimi",
|
||
agent=AGENT_KIMI,
|
||
reason=(
|
||
f"Issue type '{issue.issue_type}' with research/investigation scope. "
|
||
f"Assigning kimi-ready label for Kimi agent to pick up."
|
||
),
|
||
)
|
||
|
||
# P0 bugs and blocking issues → Claude immediately
|
||
if issue.is_p0:
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="assign_claude",
|
||
agent=AGENT_CLAUDE,
|
||
reason=(
|
||
f"P0/{issue.issue_type} issue (score={issue.score}, age={issue.age_days}d). "
|
||
f"Assigning to Claude Code for immediate attention."
|
||
),
|
||
)
|
||
|
||
# Everything else that is ready → Claude Code
|
||
return TriageDecision(
|
||
issue_number=num,
|
||
action="assign_claude",
|
||
agent=AGENT_CLAUDE,
|
||
reason=(
|
||
f"Unassigned ready issue (type={issue.issue_type}, score={issue.score}, "
|
||
f"age={issue.age_days}d). Assigning to Claude Code."
|
||
),
|
||
)
|
||
|
||
|
||
# ── Gitea API client ─────────────────────────────────────────────────────────
|
||
|
||
|
||
def _api_headers() -> dict[str, str]:
|
||
return {
|
||
"Authorization": f"token {settings.gitea_token}",
|
||
"Content-Type": "application/json",
|
||
"Accept": "application/json",
|
||
}
|
||
|
||
|
||
def _repo_url(path: str) -> str:
|
||
owner, repo = settings.gitea_repo.split("/", 1)
|
||
return f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/{path}"
|
||
|
||
|
||
async def fetch_open_issues(client: httpx.AsyncClient) -> list[dict[str, Any]]:
|
||
"""Fetch all open issues from Gitea, paginating as needed."""
|
||
all_issues: list[dict[str, Any]] = []
|
||
page = 1
|
||
while True:
|
||
url = _repo_url(f"issues?state=open&type=issues&limit=50&page={page}")
|
||
try:
|
||
resp = await client.get(url, headers=_api_headers())
|
||
if resp.status_code != 200:
|
||
logger.warning("Gitea issues fetch failed (HTTP %s)", resp.status_code)
|
||
break
|
||
batch: list[dict[str, Any]] = resp.json()
|
||
if not batch:
|
||
break
|
||
all_issues.extend(batch)
|
||
if len(batch) < 50:
|
||
break
|
||
page += 1
|
||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||
logger.warning("Gitea connection error fetching issues: %s", exc)
|
||
break
|
||
return all_issues
|
||
|
||
|
||
async def post_comment(
|
||
client: httpx.AsyncClient,
|
||
issue_number: int,
|
||
body: str,
|
||
) -> bool:
|
||
"""Post a comment on a Gitea issue. Returns True on success."""
|
||
url = _repo_url(f"issues/{issue_number}/comments")
|
||
try:
|
||
resp = await client.post(url, headers=_api_headers(), json={"body": body})
|
||
return resp.status_code in (200, 201)
|
||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||
logger.warning("Failed to post comment on #%d: %s", issue_number, exc)
|
||
return False
|
||
|
||
|
||
async def assign_issue(
|
||
client: httpx.AsyncClient,
|
||
issue_number: int,
|
||
assignee: str,
|
||
) -> bool:
|
||
"""Assign an issue to a Gitea user. Returns True on success."""
|
||
url = _repo_url(f"issues/{issue_number}")
|
||
try:
|
||
resp = await client.patch(
|
||
url,
|
||
headers=_api_headers(),
|
||
json={"assignees": [assignee]},
|
||
)
|
||
return resp.status_code in (200, 201)
|
||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||
logger.warning("Failed to assign #%d to %s: %s", issue_number, assignee, exc)
|
||
return False
|
||
|
||
|
||
async def add_label(
|
||
client: httpx.AsyncClient,
|
||
issue_number: int,
|
||
label_name: str,
|
||
) -> bool:
|
||
"""Add a label to a Gitea issue by name (auto-creates if missing). Returns True on success."""
|
||
owner, repo = settings.gitea_repo.split("/", 1)
|
||
labels_url = f"{settings.gitea_url}/api/v1/repos/{owner}/{repo}/labels"
|
||
headers = _api_headers()
|
||
|
||
try:
|
||
# Fetch existing labels
|
||
resp = await client.get(labels_url, headers=headers)
|
||
if resp.status_code != 200:
|
||
return False
|
||
existing = {lbl["name"]: lbl["id"] for lbl in resp.json()}
|
||
|
||
if label_name in existing:
|
||
label_id = existing[label_name]
|
||
else:
|
||
# Auto-create the label
|
||
create_resp = await client.post(
|
||
labels_url,
|
||
headers=headers,
|
||
json={"name": label_name, "color": "#006b75"},
|
||
)
|
||
if create_resp.status_code not in (200, 201):
|
||
return False
|
||
label_id = create_resp.json()["id"]
|
||
|
||
# Apply to the issue
|
||
apply_url = _repo_url(f"issues/{issue_number}/labels")
|
||
apply_resp = await client.post(
|
||
apply_url, headers=headers, json={"labels": [label_id]}
|
||
)
|
||
return apply_resp.status_code in (200, 201)
|
||
|
||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||
logger.warning("Failed to add label %r to #%d: %s", label_name, issue_number, exc)
|
||
return False
|
||
|
||
|
||
# ── Decision execution ───────────────────────────────────────────────────────
|
||
|
||
|
||
async def execute_decision(
|
||
client: httpx.AsyncClient,
|
||
decision: TriageDecision,
|
||
dry_run: bool = False,
|
||
) -> TriageDecision:
|
||
"""Execute a triage decision — comment + assign/label.
|
||
|
||
When dry_run=True, logs the decision but makes no Gitea API calls.
|
||
Returns the updated decision with executed=True on success.
|
||
"""
|
||
num = decision.issue_number
|
||
|
||
if decision.action == "skip":
|
||
logger.debug("Triage skip #%d: %s", num, decision.reason)
|
||
decision.executed = True
|
||
return decision
|
||
|
||
audit_comment = _build_audit_comment(decision)
|
||
|
||
if dry_run:
|
||
logger.info(
|
||
"[DRY RUN] #%d → %s (%s): %s",
|
||
num,
|
||
decision.action,
|
||
decision.agent,
|
||
decision.reason,
|
||
)
|
||
decision.executed = True
|
||
return decision
|
||
|
||
# Post audit comment first (always, so Alex can see reasoning)
|
||
comment_ok = await post_comment(client, num, audit_comment)
|
||
if not comment_ok:
|
||
decision.error = "Failed to post audit comment"
|
||
logger.warning("Triage #%d: comment failed", num)
|
||
return decision
|
||
|
||
# Execute assignment
|
||
ok = False
|
||
if decision.action == "assign_claude":
|
||
ok = await assign_issue(client, num, AGENT_CLAUDE)
|
||
elif decision.action == "assign_kimi":
|
||
ok = await add_label(client, num, KIMI_READY_LABEL)
|
||
elif decision.action == "flag_alex":
|
||
# Comment already posted above — that's sufficient for flagging
|
||
ok = True
|
||
|
||
if ok:
|
||
decision.executed = True
|
||
logger.info("Triage #%d → %s OK", num, decision.action)
|
||
else:
|
||
decision.error = f"Action {decision.action!r} failed"
|
||
logger.warning("Triage #%d: action %r failed", num, decision.action)
|
||
|
||
return decision
|
||
|
||
|
||
def _build_audit_comment(decision: TriageDecision) -> str:
|
||
"""Build the audit trail comment that Alex can read to see reasoning."""
|
||
ts = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
|
||
action_text = {
|
||
"assign_claude": f"Assigning to @{AGENT_CLAUDE} for implementation.",
|
||
"assign_kimi": f"Adding `{KIMI_READY_LABEL}` label — queuing for Kimi research agent.",
|
||
"flag_alex": f"Flagging for @{OWNER_LOGIN} — issue appears blocked or needs human decision.",
|
||
}.get(decision.action, decision.action)
|
||
|
||
return (
|
||
f"**[Timmy Triage — {ts}]**\n\n"
|
||
f"**Decision:** {action_text}\n\n"
|
||
f"**Why:** {decision.reason}\n\n"
|
||
f"*Autonomous triage by Timmy. Reply to override.*"
|
||
)
|
||
|
||
|
||
# ── Daily summary ─────────────────────────────────────────────────────────────
|
||
|
||
|
||
def _build_daily_summary(result: TriageCycleResult, scored: list[ScoredIssue]) -> str:
|
||
"""Build the daily triage summary body."""
|
||
now = datetime.now(UTC).strftime("%Y-%m-%d %H:%M UTC")
|
||
assigned = [d for d in result.decisions if d.executed and d.action != "skip"]
|
||
skipped = [d for d in result.decisions if d.action == "skip"]
|
||
|
||
lines = [
|
||
f"# Timmy Backlog Triage — {now}",
|
||
"",
|
||
f"**Open issues:** {result.total_open} | "
|
||
f"**Scored:** {result.scored} | "
|
||
f"**Ready:** {result.ready} | "
|
||
f"**Assigned this cycle:** {len(assigned)}",
|
||
"",
|
||
"## Top 10 Ready Issues (by score)",
|
||
"",
|
||
]
|
||
|
||
top = sorted([s for s in scored if s.ready], key=lambda s: (-s.score, s.number))[:10]
|
||
for s in top:
|
||
flag = "🐛" if s.issue_type == "bug" else "⚡" if s.is_p0 else "✦"
|
||
lines.append(
|
||
f"- {flag} **#{s.number}** (score={s.score}, age={s.age_days}d) — {s.title[:80]}"
|
||
)
|
||
|
||
if assigned:
|
||
lines += ["", "## Actions Taken", ""]
|
||
for d in assigned:
|
||
lines.append(f"- #{d.issue_number} → `{d.action}` ({d.agent}): {d.reason[:100]}")
|
||
|
||
if skipped:
|
||
lines += ["", f"## Skipped ({len(skipped)} issues)", ""]
|
||
for d in skipped[:5]:
|
||
lines.append(f"- #{d.issue_number}: {d.reason[:80]}")
|
||
if len(skipped) > 5:
|
||
lines.append(f"- … and {len(skipped) - 5} more")
|
||
|
||
lines += [
|
||
"",
|
||
"---",
|
||
"*Auto-generated by Timmy's backlog triage loop. "
|
||
"Override any decision by reassigning or commenting.*",
|
||
]
|
||
return "\n".join(lines)
|
||
|
||
|
||
async def post_daily_summary(
|
||
client: httpx.AsyncClient,
|
||
result: TriageCycleResult,
|
||
scored: list[ScoredIssue],
|
||
dry_run: bool = False,
|
||
) -> bool:
|
||
"""Post a daily triage summary as a new Gitea issue."""
|
||
today = datetime.now(UTC).strftime("%Y-%m-%d")
|
||
title = f"[Triage] Daily backlog summary — {today}"
|
||
body = _build_daily_summary(result, scored)
|
||
|
||
if dry_run:
|
||
logger.info("[DRY RUN] Would post daily summary: %s", title)
|
||
return True
|
||
|
||
url = _repo_url("issues")
|
||
try:
|
||
resp = await client.post(
|
||
url,
|
||
headers=_api_headers(),
|
||
json={
|
||
"title": title,
|
||
"body": body,
|
||
"labels": [],
|
||
},
|
||
)
|
||
if resp.status_code in (200, 201):
|
||
issue_num = resp.json().get("number", "?")
|
||
logger.info("Daily triage summary posted as issue #%s", issue_num)
|
||
return True
|
||
logger.warning("Daily summary post failed (HTTP %s)", resp.status_code)
|
||
return False
|
||
except (httpx.ConnectError, httpx.ReadError, httpx.TimeoutException) as exc:
|
||
logger.warning("Failed to post daily summary: %s", exc)
|
||
return False
|
||
|
||
|
||
# ── Main loop class ───────────────────────────────────────────────────────────
|
||
|
||
|
||
class BacklogTriageLoop:
|
||
"""Autonomous backlog triage loop.
|
||
|
||
Fetches, scores, and assigns Gitea issues on a configurable interval.
|
||
|
||
Parameters
|
||
----------
|
||
interval:
|
||
Seconds between triage cycles. Default: settings.backlog_triage_interval_seconds.
|
||
dry_run:
|
||
When True, score and log decisions but don't write to Gitea.
|
||
daily_summary:
|
||
When True, post a daily triage summary issue after each cycle.
|
||
"""
|
||
|
||
def __init__(
|
||
self,
|
||
*,
|
||
interval: float | None = None,
|
||
dry_run: bool | None = None,
|
||
daily_summary: bool | None = None,
|
||
) -> None:
|
||
self._interval = float(interval or settings.backlog_triage_interval_seconds)
|
||
self._dry_run = dry_run if dry_run is not None else settings.backlog_triage_dry_run
|
||
self._daily_summary = (
|
||
daily_summary if daily_summary is not None else settings.backlog_triage_daily_summary
|
||
)
|
||
self._running = False
|
||
self._task: asyncio.Task | None = None
|
||
self._cycle_count = 0
|
||
self._last_summary_date: str = ""
|
||
self.history: list[TriageCycleResult] = []
|
||
|
||
@property
|
||
def is_running(self) -> bool:
|
||
return self._running
|
||
|
||
@property
|
||
def cycle_count(self) -> int:
|
||
return self._cycle_count
|
||
|
||
async def run_once(self) -> TriageCycleResult:
|
||
"""Execute one full triage cycle.
|
||
|
||
1. Fetch all open Gitea issues
|
||
2. Score and prioritize
|
||
3. Decide on each unassigned ready issue
|
||
4. Execute decisions
|
||
5. Optionally post daily summary
|
||
"""
|
||
import time
|
||
|
||
self._cycle_count += 1
|
||
start = time.monotonic()
|
||
ts = datetime.now(UTC).isoformat()
|
||
result = TriageCycleResult(timestamp=ts, total_open=0, scored=0, ready=0)
|
||
|
||
if not settings.gitea_enabled or not settings.gitea_token:
|
||
logger.warning("Backlog triage: Gitea not configured — skipping cycle")
|
||
return result
|
||
|
||
async with httpx.AsyncClient(timeout=30) as client:
|
||
# 1. Fetch
|
||
raw_issues = await fetch_open_issues(client)
|
||
result.total_open = len(raw_issues)
|
||
logger.info("Triage cycle #%d: fetched %d open issues", self._cycle_count, len(raw_issues))
|
||
|
||
# 2. Score
|
||
scored = [score_issue(i) for i in raw_issues]
|
||
result.scored = len(scored)
|
||
result.ready = sum(1 for s in scored if s.ready)
|
||
|
||
# 3 & 4. Decide and execute for each issue
|
||
for issue in scored:
|
||
decision = decide(issue)
|
||
if decision.action == "skip":
|
||
result.decisions.append(decision)
|
||
continue
|
||
decision = await execute_decision(client, decision, dry_run=self._dry_run)
|
||
result.decisions.append(decision)
|
||
|
||
# Rate-limit: short pause between API writes to avoid hammering Gitea
|
||
if not self._dry_run:
|
||
await asyncio.sleep(0.5)
|
||
|
||
# 5. Daily summary (once per UTC day)
|
||
today = datetime.now(UTC).strftime("%Y-%m-%d")
|
||
if self._daily_summary and today != self._last_summary_date:
|
||
await post_daily_summary(client, result, scored, dry_run=self._dry_run)
|
||
self._last_summary_date = today
|
||
|
||
result.duration_ms = int((time.monotonic() - start) * 1000)
|
||
self.history.append(result)
|
||
|
||
assigned_count = sum(1 for d in result.decisions if d.executed and d.action != "skip")
|
||
logger.info(
|
||
"Triage cycle #%d complete (%d ms): %d open, %d ready, %d assigned",
|
||
self._cycle_count,
|
||
result.duration_ms,
|
||
result.total_open,
|
||
result.ready,
|
||
assigned_count,
|
||
)
|
||
return result
|
||
|
||
async def start(self) -> None:
|
||
"""Start the triage loop as a background task."""
|
||
if self._running:
|
||
logger.warning("BacklogTriageLoop already running")
|
||
return
|
||
self._running = True
|
||
await self._loop()
|
||
|
||
async def _loop(self) -> None:
|
||
logger.info(
|
||
"BacklogTriageLoop started (interval=%.0fs, dry_run=%s)",
|
||
self._interval,
|
||
self._dry_run,
|
||
)
|
||
while self._running:
|
||
try:
|
||
await self.run_once()
|
||
except Exception:
|
||
logger.exception("Backlog triage cycle failed")
|
||
await asyncio.sleep(self._interval)
|
||
|
||
def stop(self) -> None:
|
||
"""Signal the loop to stop after the current cycle."""
|
||
self._running = False
|
||
logger.info("BacklogTriageLoop stop requested")
|