From f094b0d5b519da1b9ee9b64fd71b079c7bea6b3e Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Fri, 17 Apr 2026 05:32:19 +0000 Subject: [PATCH] =?UTF-8?q?feat:=20Add=20PR=20backlog=20triage=20script=20?= =?UTF-8?q?=E2=80=94=20categorize,=20duplicates,=20stale=20detection=20(#6?= =?UTF-8?q?58)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- scripts/pr_backlog_triage.py | 290 +++++++++++++++++++++++++++++++++++ 1 file changed, 290 insertions(+) create mode 100644 scripts/pr_backlog_triage.py diff --git a/scripts/pr_backlog_triage.py b/scripts/pr_backlog_triage.py new file mode 100644 index 00000000..d9662d26 --- /dev/null +++ b/scripts/pr_backlog_triage.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python3 +""" +pr_backlog_triage.py — Automated PR backlog analysis for Gitea repos (Issue #658). + +Analyzes open PRs: categorizes, finds duplicates, detects stale references +to closed issues, and generates a triage report. + +Usage: + python3 scripts/pr_backlog_triage.py Timmy_Foundation/timmy-config + python3 scripts/pr_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale + python3 scripts/pr_backlog_triage.py Timmy_Foundation/the-nexus --json + python3 scripts/pr_backlog_triage.py --org Timmy_Foundation # All repos +""" +import argparse +import json +import os +import re +import sys +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Dict, List, Optional, Tuple +from urllib.request import Request, urlopen +from urllib.error import HTTPError + +GITEA_URL = "https://forge.alexanderwhitestone.com" +ISSUE_PATTERN = re.compile(r"#(\d+)") +CATEGORY_KEYWORDS = { + "training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data"], + "bug_fix": ["fix", "bug", "patch", "hotfix", "resolve"], + "feature": ["feat", "add", "implement", "feature"], + "docs": ["doc", "readme", "changelog"], + "ops": ["ops", "deploy", "ci", "cd", "pipeline"], +} + + +def get_token() -> str: + """Read Gitea token from config.""" + path = Path(os.path.expanduser("~/.config/gitea/token")) + if path.exists(): + return path.read_text().strip() + token = os.environ.get("GITEA_TOKEN", "") + if not token: + print("ERROR: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr) + sys.exit(1) + return token + + +def api_get(path: str, token: str, params: dict = None) -> Any: + """GET from Gitea API.""" + url = f"{GITEA_URL}/api/v1{path}" + if params: + qs = "&".join(f"{k}={v}" for k, v in params.items()) + url = f"{url}?{qs}" + req = Request(url, headers={"Authorization": f"token {token}"}) + try: + return json.loads(urlopen(req, timeout=30).read()) + except HTTPError as e: + if e.code == 404: + return None + raise + + +def api_patch(path: str, token: str, data: dict) -> Any: + """PATCH to Gitea API.""" + url = f"{GITEA_URL}/api/v1{path}" + body = json.dumps(data).encode() + req = Request(url, data=body, headers={ + "Authorization": f"token {token}", + "Content-Type": "application/json", + }, method="PATCH") + try: + return json.loads(urlopen(req, timeout=30).read()) + except HTTPError: + return None + + +def api_post(path: str, token: str, data: dict) -> Any: + """POST to Gitea API.""" + url = f"{GITEA_URL}/api/v1{path}" + body = json.dumps(data).encode() + req = Request(url, data=body, headers={ + "Authorization": f"token {token}", + "Content-Type": "application/json", + }, method="POST") + try: + return json.loads(urlopen(req, timeout=30).read()) + except HTTPError: + return None + + +def categorize_pr(pr: dict) -> str: + """Categorize a PR by title keywords.""" + title = (pr.get("title") or "").lower() + for category, keywords in CATEGORY_KEYWORDS.items(): + if any(kw in title for kw in keywords): + return category + return "other" + + +def extract_refs(pr: dict) -> List[int]: + """Extract issue numbers referenced in PR title and body.""" + text = ((pr.get("title") or "") + " " + (pr.get("body") or "")) + return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text))) + + +def find_duplicates(prs: List[dict]) -> Dict[int, List[int]]: + """Find PRs that reference the same issue.""" + issue_to_prs: Dict[int, List[int]] = {} + for pr in prs: + for ref in extract_refs(pr): + issue_to_prs.setdefault(ref, []).append(pr["number"]) + return {k: v for k, v in issue_to_prs.items() if len(v) > 1} + + +def find_stale(prs: List[dict], closed_issue_nums: set) -> List[dict]: + """Find PRs referencing closed issues.""" + stale = [] + for pr in prs: + refs = extract_refs(pr) + closed_refs = [r for r in refs if r in closed_issue_nums] + if closed_refs: + stale.append({"pr": pr["number"], "closed_refs": closed_refs}) + return stale + + +def analyze_repo(repo: str, token: str, limit: int = 100) -> dict: + """Full triage analysis for a repo.""" + # Fetch open PRs + prs = api_get(f"/repos/{repo}/pulls", token, {"state": "open", "limit": str(limit)}) + if prs is None: + return {"error": f"Could not fetch PRs for {repo}"} + + # Fetch closed issues for stale detection + closed_issues = api_get(f"/repos/{repo}/issues", token, {"state": "closed", "limit": "200"}) + closed_nums = set() + if closed_issues: + closed_nums = {iss["number"] for iss in closed_issues if not iss.get("pull_request")} + + # Categorize + categories: Dict[str, List[dict]] = {} + for pr in prs: + cat = categorize_pr(pr) + categories.setdefault(cat, []).append({ + "number": pr["number"], + "title": pr.get("title", ""), + "head": pr.get("head", {}).get("ref", ""), + "refs": extract_refs(pr), + "additions": pr.get("additions", 0), + "deletions": pr.get("deletions", 0), + "changed_files": pr.get("changed_files", 0), + "created": pr.get("created_at", ""), + }) + + duplicates = find_duplicates(prs) + stale = find_stale(prs, closed_nums) + + return { + "repo": repo, + "total_open": len(prs), + "categories": {k: len(v) for k, v in categories.items()}, + "category_details": categories, + "duplicates": duplicates, + "stale_prs": stale, + "closed_issues_checked": len(closed_nums), + } + + +def close_stale_prs(stale: List[dict], repo: str, token: str, dry_run: bool = True) -> List[dict]: + """Close PRs that reference closed issues.""" + closed = [] + for item in stale: + pr_num = item["pr"] + refs = item["closed_refs"] + if dry_run: + closed.append({"pr": pr_num, "action": "would_close", "refs": refs}) + continue + # Comment explaining closure + api_post(f"/repos/{repo}/issues/{pr_num}/comments", token, { + "body": f"Closing: references closed issue(s) {', '.join(f'#{r}' for r in refs)}. Triage cleanup." + }) + # Close the PR + api_patch(f"/repos/{repo}/pulls/{pr_num}", token, {"state": "closed"}) + closed.append({"pr": pr_num, "action": "closed", "refs": refs}) + return closed + + +def format_report(analysis: dict) -> str: + """Format triage analysis as markdown report.""" + lines = [ + f"## PR Backlog Triage — {analysis['repo']}", + f"", + f"**Total open PRs:** {analysis['total_open']}", + f"**Closed issues checked:** {analysis['closed_issues_checked']}", + "", + "### Categories", + "", + "| Category | Count |", + "|----------|-------|", + ] + for cat, count in sorted(analysis["categories"].items()): + lines.append(f"| {cat} | {count} |") + + # Duplicates + if analysis["duplicates"]: + lines.extend(["", "### Duplicate PRs (same issue referenced)", ""]) + for issue, pr_nums in analysis["duplicates"].items(): + lines.append(f"- Issue #{issue}: PRs {pr_nums}") + + # Stale + if analysis["stale_prs"]: + lines.extend(["", "### Stale PRs (reference closed issues)", ""]) + for item in analysis["stale_prs"]: + lines.append(f"- PR #{item['pr']}: references closed {', '.join(f'#{r}' for r in item['closed_refs'])}") + + # Details per category + for cat, items in analysis.get("category_details", {}).items(): + if not items: + continue + lines.extend([f"", f"### {cat.replace('_', ' ').title()} ({len(items)})", ""]) + for pr in items: + refs_str = f" (refs: {', '.join(f'#{r}' for r in pr['refs'])})" if pr["refs"] else "" + lines.append(f"- #{pr['number']}: {pr['title'][:70]}{refs_str}") + + return "\n".join(lines) + + +def format_json(analysis: dict) -> str: + """Format as JSON.""" + return json.dumps(analysis, indent=2, default=str) + + +def main(): + parser = argparse.ArgumentParser(description="PR backlog triage for Gitea repos") + parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)") + parser.add_argument("--org", help="Triage all repos in org (instead of single repo)") + parser.add_argument("--close-stale", action="store_true", help="Close PRs referencing closed issues") + parser.add_argument("--dry-run", action="store_true", default=True, help="Don't actually close (default)") + parser.add_argument("--json", action="store_true", help="Output as JSON") + parser.add_argument("--output", help="Write report to file") + parser.add_argument("--token", help="Gitea token (overrides config file)") + args = parser.parse_args() + + token = args.token or get_token() + + # Determine repos to analyze + repos = [] + if args.org: + org_repos = api_get(f"/orgs/{args.org}/repos", token, {"limit": "50"}) + if org_repos: + repos = [r["full_name"] for r in org_repos] + elif args.repo: + repos = [args.repo] + else: + parser.error("Provide REPO or --org") + + all_analyses = [] + for repo in repos: + analysis = analyze_repo(repo, token) + if "error" in analysis: + print(f"SKIP: {analysis['error']}", file=sys.stderr) + continue + all_analyses.append(analysis) + + # Close stale if requested + if args.close_stale and analysis["stale_prs"]: + actually_close = not args.dry_run + closed = close_stale_prs(analysis["stale_prs"], repo, token, dry_run=not actually_close) + analysis["closed_actions"] = closed + + # Output + if args.json: + output = format_json(all_analyses[0] if len(all_analyses) == 1 else all_analyses) + else: + parts = [format_report(a) for a in all_analyses] + output = "\n\n---\n\n".join(parts) + + if args.output: + Path(args.output).write_text(output, encoding="utf-8") + print(f"Report written to {args.output}") + else: + print(output) + + # Exit 1 if any stale PRs found (CI mode) + total_stale = sum(len(a.get("stale_prs", [])) for a in all_analyses) + if total_stale > 0: + sys.exit(1) + + +if __name__ == "__main__": + main()