#!/usr/bin/env python3 """ pr_backlog_triage.py — Automated PR backlog analysis for Gitea repos (Issue #658). Analyzes open PRs: categorizes, finds duplicates, detects stale references to closed issues, and generates a triage report. Usage: python3 scripts/pr_backlog_triage.py Timmy_Foundation/timmy-config python3 scripts/pr_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale python3 scripts/pr_backlog_triage.py Timmy_Foundation/the-nexus --json python3 scripts/pr_backlog_triage.py --org Timmy_Foundation # All repos """ import argparse import json import os import re import sys from datetime import datetime, timezone from pathlib import Path from typing import Any, Dict, List, Optional, Tuple from urllib.request import Request, urlopen from urllib.error import HTTPError GITEA_URL = "https://forge.alexanderwhitestone.com" ISSUE_PATTERN = re.compile(r"#(\d+)") CATEGORY_KEYWORDS = { "training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data"], "bug_fix": ["fix", "bug", "patch", "hotfix", "resolve"], "feature": ["feat", "add", "implement", "feature"], "docs": ["doc", "readme", "changelog"], "ops": ["ops", "deploy", "ci", "cd", "pipeline"], } def get_token() -> str: """Read Gitea token from config.""" path = Path(os.path.expanduser("~/.config/gitea/token")) if path.exists(): return path.read_text().strip() token = os.environ.get("GITEA_TOKEN", "") if not token: print("ERROR: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr) sys.exit(1) return token def api_get(path: str, token: str, params: dict = None) -> Any: """GET from Gitea API.""" url = f"{GITEA_URL}/api/v1{path}" if params: qs = "&".join(f"{k}={v}" for k, v in params.items()) url = f"{url}?{qs}" req = Request(url, headers={"Authorization": f"token {token}"}) try: return json.loads(urlopen(req, timeout=30).read()) except HTTPError as e: if e.code == 404: return None raise def api_patch(path: str, token: str, data: dict) -> Any: """PATCH to Gitea API.""" url = f"{GITEA_URL}/api/v1{path}" body = json.dumps(data).encode() req = Request(url, data=body, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", }, method="PATCH") try: return json.loads(urlopen(req, timeout=30).read()) except HTTPError: return None def api_post(path: str, token: str, data: dict) -> Any: """POST to Gitea API.""" url = f"{GITEA_URL}/api/v1{path}" body = json.dumps(data).encode() req = Request(url, data=body, headers={ "Authorization": f"token {token}", "Content-Type": "application/json", }, method="POST") try: return json.loads(urlopen(req, timeout=30).read()) except HTTPError: return None def categorize_pr(pr: dict) -> str: """Categorize a PR by title keywords.""" title = (pr.get("title") or "").lower() for category, keywords in CATEGORY_KEYWORDS.items(): if any(kw in title for kw in keywords): return category return "other" def extract_refs(pr: dict) -> List[int]: """Extract issue numbers referenced in PR title and body.""" text = ((pr.get("title") or "") + " " + (pr.get("body") or "")) return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text))) def find_duplicates(prs: List[dict]) -> Dict[int, List[int]]: """Find PRs that reference the same issue.""" issue_to_prs: Dict[int, List[int]] = {} for pr in prs: for ref in extract_refs(pr): issue_to_prs.setdefault(ref, []).append(pr["number"]) return {k: v for k, v in issue_to_prs.items() if len(v) > 1} def find_stale(prs: List[dict], closed_issue_nums: set) -> List[dict]: """Find PRs referencing closed issues.""" stale = [] for pr in prs: refs = extract_refs(pr) closed_refs = [r for r in refs if r in closed_issue_nums] if closed_refs: stale.append({"pr": pr["number"], "closed_refs": closed_refs}) return stale def analyze_repo(repo: str, token: str, limit: int = 100) -> dict: """Full triage analysis for a repo.""" # Fetch open PRs prs = api_get(f"/repos/{repo}/pulls", token, {"state": "open", "limit": str(limit)}) if prs is None: return {"error": f"Could not fetch PRs for {repo}"} # Fetch closed issues for stale detection closed_issues = api_get(f"/repos/{repo}/issues", token, {"state": "closed", "limit": "200"}) closed_nums = set() if closed_issues: closed_nums = {iss["number"] for iss in closed_issues if not iss.get("pull_request")} # Categorize categories: Dict[str, List[dict]] = {} for pr in prs: cat = categorize_pr(pr) categories.setdefault(cat, []).append({ "number": pr["number"], "title": pr.get("title", ""), "head": pr.get("head", {}).get("ref", ""), "refs": extract_refs(pr), "additions": pr.get("additions", 0), "deletions": pr.get("deletions", 0), "changed_files": pr.get("changed_files", 0), "created": pr.get("created_at", ""), }) duplicates = find_duplicates(prs) stale = find_stale(prs, closed_nums) return { "repo": repo, "total_open": len(prs), "categories": {k: len(v) for k, v in categories.items()}, "category_details": categories, "duplicates": duplicates, "stale_prs": stale, "closed_issues_checked": len(closed_nums), } def close_stale_prs(stale: List[dict], repo: str, token: str, dry_run: bool = True) -> List[dict]: """Close PRs that reference closed issues.""" closed = [] for item in stale: pr_num = item["pr"] refs = item["closed_refs"] if dry_run: closed.append({"pr": pr_num, "action": "would_close", "refs": refs}) continue # Comment explaining closure api_post(f"/repos/{repo}/issues/{pr_num}/comments", token, { "body": f"Closing: references closed issue(s) {', '.join(f'#{r}' for r in refs)}. Triage cleanup." }) # Close the PR api_patch(f"/repos/{repo}/pulls/{pr_num}", token, {"state": "closed"}) closed.append({"pr": pr_num, "action": "closed", "refs": refs}) return closed def format_report(analysis: dict) -> str: """Format triage analysis as markdown report.""" lines = [ f"## PR Backlog Triage — {analysis['repo']}", f"", f"**Total open PRs:** {analysis['total_open']}", f"**Closed issues checked:** {analysis['closed_issues_checked']}", "", "### Categories", "", "| Category | Count |", "|----------|-------|", ] for cat, count in sorted(analysis["categories"].items()): lines.append(f"| {cat} | {count} |") # Duplicates if analysis["duplicates"]: lines.extend(["", "### Duplicate PRs (same issue referenced)", ""]) for issue, pr_nums in analysis["duplicates"].items(): lines.append(f"- Issue #{issue}: PRs {pr_nums}") # Stale if analysis["stale_prs"]: lines.extend(["", "### Stale PRs (reference closed issues)", ""]) for item in analysis["stale_prs"]: lines.append(f"- PR #{item['pr']}: references closed {', '.join(f'#{r}' for r in item['closed_refs'])}") # Details per category for cat, items in analysis.get("category_details", {}).items(): if not items: continue lines.extend([f"", f"### {cat.replace('_', ' ').title()} ({len(items)})", ""]) for pr in items: refs_str = f" (refs: {', '.join(f'#{r}' for r in pr['refs'])})" if pr["refs"] else "" lines.append(f"- #{pr['number']}: {pr['title'][:70]}{refs_str}") return "\n".join(lines) def format_json(analysis: dict) -> str: """Format as JSON.""" return json.dumps(analysis, indent=2, default=str) def main(): parser = argparse.ArgumentParser(description="PR backlog triage for Gitea repos") parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)") parser.add_argument("--org", help="Triage all repos in org (instead of single repo)") parser.add_argument("--close-stale", action="store_true", help="Close PRs referencing closed issues") parser.add_argument("--dry-run", action="store_true", default=True, help="Don't actually close (default)") parser.add_argument("--json", action="store_true", help="Output as JSON") parser.add_argument("--output", help="Write report to file") parser.add_argument("--token", help="Gitea token (overrides config file)") args = parser.parse_args() token = args.token or get_token() # Determine repos to analyze repos = [] if args.org: org_repos = api_get(f"/orgs/{args.org}/repos", token, {"limit": "50"}) if org_repos: repos = [r["full_name"] for r in org_repos] elif args.repo: repos = [args.repo] else: parser.error("Provide REPO or --org") all_analyses = [] for repo in repos: analysis = analyze_repo(repo, token) if "error" in analysis: print(f"SKIP: {analysis['error']}", file=sys.stderr) continue all_analyses.append(analysis) # Close stale if requested if args.close_stale and analysis["stale_prs"]: actually_close = not args.dry_run closed = close_stale_prs(analysis["stale_prs"], repo, token, dry_run=not actually_close) analysis["closed_actions"] = closed # Output if args.json: output = format_json(all_analyses[0] if len(all_analyses) == 1 else all_analyses) else: parts = [format_report(a) for a in all_analyses] output = "\n\n---\n\n".join(parts) if args.output: Path(args.output).write_text(output, encoding="utf-8") print(f"Report written to {args.output}") else: print(output) # Exit 1 if any stale PRs found (CI mode) total_stale = sum(len(a.get("stale_prs", [])) for a in all_analyses) if total_stale > 0: sys.exit(1) if __name__ == "__main__": main()