Files
timmy-config/scripts/triage_backlog.py
Hermes Agent (STEP35) 887f4a27a4
Some checks failed
Architecture Lint / Linter Tests (pull_request) Successful in 29s
Validate Config / YAML Lint (pull_request) Failing after 14s
Smoke Test / smoke (pull_request) Failing after 22s
Validate Config / JSON Validate (pull_request) Successful in 22s
Validate Config / Python Syntax & Import Check (pull_request) Failing after 1m6s
Validate Config / Python Test Suite (pull_request) Has been skipped
Validate Config / Shell Script Lint (pull_request) Failing after 1m5s
Validate Config / Cron Syntax Check (pull_request) Successful in 12s
Validate Config / Deploy Script Dry Run (pull_request) Successful in 13s
Validate Config / Playbook Schema Validation (pull_request) Successful in 30s
PR Checklist / pr-checklist (pull_request) Successful in 4m36s
Architecture Lint / Lint Repository (pull_request) Failing after 23s
[AUDIT] Implement issue backlog triage script for #478
Add scripts/triage_backlog.py — a mechanized triage tool for the
timmy-config issue backlog. Implements the smallest concrete fix
required by #478: close stale issues (>14d inactive) and apply
P0/P1/P2/P3 priority labels to remaining open issues.

Features:
- Fetches all open issues via Gitea API (type=issues filter)
- Detects stale issues: no activity for STALE_DAYS (14)
- Identifies potential duplicates by normalized title
- Assigns priority labels (P0=critical/security, P1=high/bugs,
  P2=medium, P3=low/enhancement)
- Creates P0-P3 labels if missing in the target repo
- Dry-run default; --close-stale to enact closures
- JSON output mode for automation; --output for report files
- Exit code 1 when stale issues found (CI-friendly)

Tests (tests/test_triage_backlog.py): 11 tests covering
stale detection, duplicate normalization, and priority heuristics.

Closes #478
2026-04-30 10:15:46 -04:00

431 lines
16 KiB
Python
Executable File

#!/usr/bin/env python3
"""
triage_backlog.py — Automated issue backlog triage for Gitea repos (Issue #478).
Closes stale issues (>14 days inactive) and applies P0/P1/P2/P3 priority labels
to remaining open issues. Generates a triage report.
Usage:
python3 scripts/triage_backlog.py Timmy_Foundation/timmy-config
python3 scripts/triage_backlog.py Timmy_Foundation/timmy-config --close-stale
python3 scripts/triage_backlog.py --org Timmy_Foundation --dry-run
python3 scripts/triage_backlog.py Timmy_Foundation/hermes-agent --json
"""
import argparse
import json
import os
import re
import sys
from collections import defaultdict
from datetime import datetime, timezone, timedelta
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.request import Request, urlopen
from urllib.error import HTTPError
GITEA_URL = "https://forge.alexanderwhitestone.com"
# Staleness threshold: 14 days of no updates
STALE_DAYS = 14
# Priority label names
PRIORITY_LABELS = ["P0", "P1", "P2", "P3"]
# Existing priority/critical labels to consider for P0 mapping
CRITICAL_LABELS = {"critical", "p0-test"}
def get_token() -> str:
"""Read Gitea token from config."""
path = Path(os.path.expanduser("~/.config/gitea/token"))
if path.exists():
return path.read_text().strip()
token = os.environ.get("GITEA_TOKEN", "")
if not token:
print("ERROR: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
sys.exit(1)
return token
def api(method: str, path: str, token: str, data: dict = None, params: dict = None) -> Any:
"""Call Gitea REST API."""
url = f"{GITEA_URL}/api/v1{path}"
if params:
url += "?" + "&".join(f"{k}={v}" for k, v in params.items())
body = json.dumps(data).encode() if data else None
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method=method)
try:
resp = urlopen(req, timeout=30)
return json.loads(resp.read())
except HTTPError as e:
err_body = e.read().decode() if e.fp else ""
return {"_error": e.code, "_body": err_body[:300]}
def ensure_priority_labels(repo: str, token: str) -> Dict[str, int]:
"""Ensure P0/P1/P2/P3 labels exist in the repo. Returns label id map."""
existing = {}
# Get current labels
labels = api("GET", f"/repos/{repo}/labels", token, params={"per_page": "100"})
if isinstance(labels, list):
for lbl in labels:
if lbl["name"] in PRIORITY_LABELS:
existing[lbl["name"]] = lbl["id"]
# Create missing
colors = {"P0": "#FF0000", "P1": "#FF7F00", "P2": "#FFFF00", "P3": "#ADFF2F"}
descs = {
"P0": "Critical priority — must fix immediately",
"P1": "High priority — fix soon",
"P2": "Medium priority — normal backlog",
"P3": "Low priority — nice to have",
}
for pl in PRIORITY_LABELS:
if pl not in existing:
api("POST", f"/repos/{repo}/labels", token, {
"name": pl,
"color": colors[pl],
"description": descs[pl],
})
# Re-fetch to get IDs
labels = api("GET", f"/repos/{repo}/labels", token, params={"per_page": "100"})
if isinstance(labels, list):
for lbl in labels:
if lbl["name"] in PRIORITY_LABELS:
existing[lbl["name"]] = lbl["id"]
return existing
def fetch_open_issues(repo: str, token: str, quiet: bool = False) -> List[dict]:
"""Fetch all open issues (excluding PRs) for a repo."""
issues = []
page = 1
per_page = 100
while True:
batch = api("GET", f"/repos/{repo}/issues", token, params={
"state": "open",
"type": "issues", # exclude PRs at API level
"limit": str(per_page),
"page": str(page),
"sort": "created",
"direction": "desc",
})
if not isinstance(batch, list):
break
if not batch:
break
for iss in batch:
if iss.get("pull_request") is None:
issues.append(iss)
page += 1
if page > 20: # safety cap (~2000 issues)
if not quiet:
print(f" WARNING: pagination cap at page {page}")
break
return issues
def is_stale(issue: dict, days: int = STALE_DAYS) -> bool:
"""Check if an issue is stale: no activity (updated_at) for N days."""
updated_str = issue.get("updated_at") or issue.get("created_at")
if not updated_str:
return False
updated = datetime.fromisoformat(updated_str.replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
age = (now - updated).days
return age >= days
def find_duplicate_candidates(issues: List[dict]) -> Dict[int, List[int]]:
"""Find issues with very similar titles (exact title match or title prefix collision)."""
title_map: Dict[str, List[int]] = {}
# Normalize titles for comparison: lowercase, strip, remove common prefixes
def normalize(title: str) -> str:
t = title.lower().strip()
# Strip common prefixes
t = re.sub(r'^\[(bug|feat|docs|fix|chore|refactor|test|build|ci|ops|security|a11y|enhancement|research|adversary)\]', '', t)
t = re.sub(r'^\[[^\]]+\]\s*', '', t)
t = re.sub(r'^\w+:\s*', '', t) # "fix:", "feat:", etc.
return t.strip()
for iss in issues:
key = normalize(iss.get("title", ""))
if len(key) < 10:
continue # Too short to be meaningful
title_map.setdefault(key, []).append(iss["number"])
return {k: v for k, v in title_map.items() if len(v) > 1}
def assign_priority(issue: dict, all_issues: List[dict]) -> Optional[str]:
"""Assign P0/P1/P2/P3 priority based on heuristics."""
labels = {lbl["name"].lower() for lbl in issue.get("labels", [])}
title = (issue.get("title") or "").lower()
body = (issue.get("body") or "").lower()
comments_count = issue.get("comments", 0)
refs_issue_count = len(re.findall(r"#(\d+)", f"{title} {body}"))
# P0: Critical blockers, security issues, explicitly labeled critical, or referenced by many other issues
if any(crit in labels for crit in CRITICAL_LABELS):
return "P0"
if any(kw in title or kw in body for kw in ["security", "vulnerability", "xss", "injection", "auth bypass", "critical"]):
return "P0"
if refs_issue_count >= 5:
return "P0"
# P1: High activity, bug fixes, implementation blockers
if comments_count >= 5:
return "P1"
if any(kw in title for kw in ["fix", "bug", "broken", "regression", "failure"]):
return "P1"
if any(kw in title or kw in body for kw in ["urgency", "asap", "immediately", "blocker"]):
return "P1"
# P3: Old, low activity, enhancement/research, very short titles
age_days = (datetime.now(timezone.utc) -
datetime.fromisoformat(issue["created_at"].replace("Z", "+00:00"))).days
if age_days > 180 and comments_count <= 1:
return "P3"
if any(kw in title for kw in ["enhancement", "improve", "consider", "maybe", "wishlist"]):
return "P3"
# P2 is the default middle bucket
return "P2"
def close_issue(issue_num: int, repo: str, token: str, reason: str, dry_run: bool = True) -> dict:
"""Close an issue with a comment explaining why."""
result = {"issue": issue_num, "action": "would_close" if dry_run else "closed", "reason": reason}
if dry_run:
return result
# Comment first
api("POST", f"/repos/{repo}/issues/{issue_num}/comments", token, {
"body": f"Closing as {reason}. Triage cleanup per #478."
})
# Close the issue
api("PATCH", f"/repos/{repo}/issues/{issue_num}", token, {"state": "closed"})
return result
def apply_label(issue_num: int, repo: str, token: str, label_id: int, dry_run: bool = True) -> dict:
"""Apply a label to an issue."""
result = {"issue": issue_num, "label_id": label_id, "action": "would_label" if dry_run else "labeled"}
if not dry_run:
api("POST", f"/repos/{repo}/issues/{issue_num}/labels", token, {"labels": [label_id]})
return result
def analyze_repo(repo: str, token: str, quiet: bool = False) -> dict:
"""Analyze open issues for a repo."""
issues = fetch_open_issues(repo, token, quiet=quiet)
if not quiet:
print(f" Fetched {len(issues)} open issues", file=sys.stderr)
# Ensure priority labels exist (quietly)
label_ids = ensure_priority_labels(repo, token)
stale_issues = []
duplicate_groups = find_duplicate_candidates(issues)
duplicate_issue_nums = {num for group in duplicate_groups.values() for num in group}
# Categorize issues for priority
priority_counts: Dict[str, int] = defaultdict(int)
issues_by_priority: Dict[str, List[dict]] = defaultdict(list)
priority_assignments: Dict[int, str] = {}
stale_close_candidates = []
non_stale = []
for iss in issues:
age_days = (datetime.now(timezone.utc) -
datetime.fromisoformat(iss["created_at"].replace("Z", "+00:00"))).days
if is_stale(iss):
stale_issues.append({
"number": iss["number"],
"title": iss.get("title", ""),
"created": iss["created_at"],
"updated": iss.get("updated_at", ""),
"age_days": age_days,
})
stale_close_candidates.append(iss)
else:
non_stale.append(iss)
prio = assign_priority(iss, issues)
priority_assignments[iss["number"]] = prio
priority_counts[prio] += 1
issues_by_priority[prio].append({
"number": iss["number"],
"title": iss.get("title", ""),
"comments": iss.get("comments", 0),
"age_days": age_days,
})
return {
"repo": repo,
"total_open": len(issues),
"stale_issues": stale_issues,
"duplicate_groups": [{"representative": v[0], "members": v} for k, v in duplicate_groups.items()],
"priority_counts": dict(priority_counts),
"priority_details": {k: v for k, v in issues_by_priority.items()},
"priority_assignments": priority_assignments,
"label_ids": label_ids,
}
def close_stale_issues(analysis: dict, repo: str, token: str, dry_run: bool = True) -> List[dict]:
"""Close identified stale issues."""
closed = []
for item in analysis["stale_issues"]:
num = item["number"]
# Don't close if it's a duplicate candidate that should be preserved?
# For now close all stale
result = close_issue(num, repo, token,
f"stale (no activity for {STALE_DAYS}+ days)",
dry_run=dry_run)
closed.append(result)
return closed
def apply_priority_labels(analysis: dict, repo: str, token: str, dry_run: bool = True) -> List[dict]:
"""Apply P0/P1/P2/P3 labels to non-stale issues."""
actions = []
label_ids = analysis["label_ids"]
for num, prio in analysis["priority_assignments"].items():
label_id = label_ids.get(prio)
if label_id:
result = apply_label(num, repo, token, label_id, dry_run=dry_run)
result["priority"] = prio
actions.append(result)
return actions
def format_report(analysis: dict) -> str:
"""Format triage analysis as markdown report."""
lines = [
f"## Issue Backlog Triage — {analysis['repo']}",
f"",
f"**Total open issues:** {analysis['total_open']}",
f"**Stale threshold:** {STALE_DAYS} days",
"",
"### Summary",
"",
f"- **Stale issues:** {len(analysis['stale_issues'])} (candidates for closure)",
f"- **Priority breakdown:**",
]
for prio in ["P0", "P1", "P2", "P3"]:
count = analysis["priority_counts"].get(prio, 0)
lines.append(f" - {prio}: {count}")
lines.append("")
# Duplicate groups
if analysis["duplicate_groups"]:
lines.append("### Potential Duplicates (similar titles)")
lines.append("")
for grp in analysis["duplicate_groups"][:10]:
members = ", ".join(f"#{n}" for n in grp["members"])
lines.append(f"- {members}")
lines.append("")
# Stale details
if analysis["stale_issues"]:
lines.append("### Stale Issues (oldest first)")
lines.append("")
for item in sorted(analysis["stale_issues"], key=lambda x: x["age_days"], reverse=True)[:20]:
lines.append(f"- #{item['number']}: {item['title'][:60]} (age: {item['age_days']}d)")
lines.append("")
# Priority details
for prio in ["P0", "P1", "P2", "P3"]:
items = analysis["priority_details"].get(prio, [])
if not items:
continue
lines.append(f"### {prio} Priority ({len(items)})")
lines.append("")
for item in items[:15]:
lines.append(f"- #{item['number']}: {item['title'][:60]} (comments: {item['comments']}, age: {item['age_days']}d)")
lines.append("")
return "\n".join(lines)
def format_json(analysis: dict) -> str:
"""Format as JSON."""
return json.dumps(analysis, indent=2, default=str)
def main():
parser = argparse.ArgumentParser(description="Issue backlog triage for Gitea repos")
parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)")
parser.add_argument("--org", help="Triage all repos in org (instead of single repo)")
parser.add_argument("--close-stale", action="store_true", help="Close stale issues (default: dry-run)")
parser.add_argument("--dry-run", action="store_true", default=True, help="Don't actually close/label (default)")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--token", help="Gitea token (overrides config file)")
args = parser.parse_args()
token = args.token or get_token()
dry_run = args.dry_run and not args.close_stale # --close-stale disables dry-run
# Determine repos
repos = []
if args.org:
org_repos = api("GET", f"/orgs/{args.org}/repos", token, params={"limit": "50"})
if isinstance(org_repos, list):
repos = [r["full_name"] for r in org_repos]
elif args.repo:
repos = [args.repo]
else:
parser.error("Provide REPO or --org")
all_analyses = []
quiet = args.json
for repo in repos:
if not quiet:
print(f"\n=== Triage: {repo} ===", file=sys.stderr)
analysis = analyze_repo(repo, token, quiet=quiet)
if "error" in analysis:
print(f"SKIP: {analysis['error']}", file=sys.stderr)
continue
# Close stale if requested
if args.close_stale and analysis["stale_issues"]:
if not quiet:
print(f"Closing {len(analysis['stale_issues'])} stale issues...", file=sys.stderr)
analysis["close_actions"] = close_stale_issues(analysis, repo, token, dry_run=dry_run)
else:
analysis["close_actions"] = []
# Apply priority labels
if not dry_run and analysis["priority_assignments"]:
if not quiet:
print(f"Applying priority labels to {len(analysis['priority_assignments'])} issues...", file=sys.stderr)
analysis["label_actions"] = apply_priority_labels(analysis, repo, token, dry_run=dry_run)
else:
analysis["label_actions"] = []
all_analyses.append(analysis)
# Output
if args.json:
output = format_json(all_analyses[0] if len(all_analyses) == 1 else all_analyses)
else:
parts = [format_report(a) for a in all_analyses]
output = "\n\n---\n\n".join(parts)
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
if not quiet:
print(f"Report written to {args.output}", file=sys.stderr)
else:
print(output)
# Exit code: 1 if any stale issues found that should be closed (CI helper)
total_stale = sum(len(a.get("stale_issues", [])) for a in all_analyses)
if total_stale > 0:
sys.exit(1)
if __name__ == "__main__":
main()