Files
timmy-config/scripts/pr_backlog_triage.py

291 lines
10 KiB
Python

#!/usr/bin/env python3
"""
pr_backlog_triage.py — Automated PR backlog analysis for Gitea repos (Issue #658).
Analyzes open PRs: categorizes, finds duplicates, detects stale references
to closed issues, and generates a triage report.
Usage:
python3 scripts/pr_backlog_triage.py Timmy_Foundation/timmy-config
python3 scripts/pr_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale
python3 scripts/pr_backlog_triage.py Timmy_Foundation/the-nexus --json
python3 scripts/pr_backlog_triage.py --org Timmy_Foundation # All repos
"""
import argparse
import json
import os
import re
import sys
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Dict, List, Optional, Tuple
from urllib.request import Request, urlopen
from urllib.error import HTTPError
GITEA_URL = "https://forge.alexanderwhitestone.com"
ISSUE_PATTERN = re.compile(r"#(\d+)")
CATEGORY_KEYWORDS = {
"training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data"],
"bug_fix": ["fix", "bug", "patch", "hotfix", "resolve"],
"feature": ["feat", "add", "implement", "feature"],
"docs": ["doc", "readme", "changelog"],
"ops": ["ops", "deploy", "ci", "cd", "pipeline"],
}
def get_token() -> str:
"""Read Gitea token from config."""
path = Path(os.path.expanduser("~/.config/gitea/token"))
if path.exists():
return path.read_text().strip()
token = os.environ.get("GITEA_TOKEN", "")
if not token:
print("ERROR: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
sys.exit(1)
return token
def api_get(path: str, token: str, params: dict = None) -> Any:
"""GET from Gitea API."""
url = f"{GITEA_URL}/api/v1{path}"
if params:
qs = "&".join(f"{k}={v}" for k, v in params.items())
url = f"{url}?{qs}"
req = Request(url, headers={"Authorization": f"token {token}"})
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError as e:
if e.code == 404:
return None
raise
def api_patch(path: str, token: str, data: dict) -> Any:
"""PATCH to Gitea API."""
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method="PATCH")
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def api_post(path: str, token: str, data: dict) -> Any:
"""POST to Gitea API."""
url = f"{GITEA_URL}/api/v1{path}"
body = json.dumps(data).encode()
req = Request(url, data=body, headers={
"Authorization": f"token {token}",
"Content-Type": "application/json",
}, method="POST")
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def categorize_pr(pr: dict) -> str:
"""Categorize a PR by title keywords."""
title = (pr.get("title") or "").lower()
for category, keywords in CATEGORY_KEYWORDS.items():
if any(kw in title for kw in keywords):
return category
return "other"
def extract_refs(pr: dict) -> List[int]:
"""Extract issue numbers referenced in PR title and body."""
text = ((pr.get("title") or "") + " " + (pr.get("body") or ""))
return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text)))
def find_duplicates(prs: List[dict]) -> Dict[int, List[int]]:
"""Find PRs that reference the same issue."""
issue_to_prs: Dict[int, List[int]] = {}
for pr in prs:
for ref in extract_refs(pr):
issue_to_prs.setdefault(ref, []).append(pr["number"])
return {k: v for k, v in issue_to_prs.items() if len(v) > 1}
def find_stale(prs: List[dict], closed_issue_nums: set) -> List[dict]:
"""Find PRs referencing closed issues."""
stale = []
for pr in prs:
refs = extract_refs(pr)
closed_refs = [r for r in refs if r in closed_issue_nums]
if closed_refs:
stale.append({"pr": pr["number"], "closed_refs": closed_refs})
return stale
def analyze_repo(repo: str, token: str, limit: int = 100) -> dict:
"""Full triage analysis for a repo."""
# Fetch open PRs
prs = api_get(f"/repos/{repo}/pulls", token, {"state": "open", "limit": str(limit)})
if prs is None:
return {"error": f"Could not fetch PRs for {repo}"}
# Fetch closed issues for stale detection
closed_issues = api_get(f"/repos/{repo}/issues", token, {"state": "closed", "limit": "200"})
closed_nums = set()
if closed_issues:
closed_nums = {iss["number"] for iss in closed_issues if not iss.get("pull_request")}
# Categorize
categories: Dict[str, List[dict]] = {}
for pr in prs:
cat = categorize_pr(pr)
categories.setdefault(cat, []).append({
"number": pr["number"],
"title": pr.get("title", ""),
"head": pr.get("head", {}).get("ref", ""),
"refs": extract_refs(pr),
"additions": pr.get("additions", 0),
"deletions": pr.get("deletions", 0),
"changed_files": pr.get("changed_files", 0),
"created": pr.get("created_at", ""),
})
duplicates = find_duplicates(prs)
stale = find_stale(prs, closed_nums)
return {
"repo": repo,
"total_open": len(prs),
"categories": {k: len(v) for k, v in categories.items()},
"category_details": categories,
"duplicates": duplicates,
"stale_prs": stale,
"closed_issues_checked": len(closed_nums),
}
def close_stale_prs(stale: List[dict], repo: str, token: str, dry_run: bool = True) -> List[dict]:
"""Close PRs that reference closed issues."""
closed = []
for item in stale:
pr_num = item["pr"]
refs = item["closed_refs"]
if dry_run:
closed.append({"pr": pr_num, "action": "would_close", "refs": refs})
continue
# Comment explaining closure
api_post(f"/repos/{repo}/issues/{pr_num}/comments", token, {
"body": f"Closing: references closed issue(s) {', '.join(f'#{r}' for r in refs)}. Triage cleanup."
})
# Close the PR
api_patch(f"/repos/{repo}/pulls/{pr_num}", token, {"state": "closed"})
closed.append({"pr": pr_num, "action": "closed", "refs": refs})
return closed
def format_report(analysis: dict) -> str:
"""Format triage analysis as markdown report."""
lines = [
f"## PR Backlog Triage — {analysis['repo']}",
f"",
f"**Total open PRs:** {analysis['total_open']}",
f"**Closed issues checked:** {analysis['closed_issues_checked']}",
"",
"### Categories",
"",
"| Category | Count |",
"|----------|-------|",
]
for cat, count in sorted(analysis["categories"].items()):
lines.append(f"| {cat} | {count} |")
# Duplicates
if analysis["duplicates"]:
lines.extend(["", "### Duplicate PRs (same issue referenced)", ""])
for issue, pr_nums in analysis["duplicates"].items():
lines.append(f"- Issue #{issue}: PRs {pr_nums}")
# Stale
if analysis["stale_prs"]:
lines.extend(["", "### Stale PRs (reference closed issues)", ""])
for item in analysis["stale_prs"]:
lines.append(f"- PR #{item['pr']}: references closed {', '.join(f'#{r}' for r in item['closed_refs'])}")
# Details per category
for cat, items in analysis.get("category_details", {}).items():
if not items:
continue
lines.extend([f"", f"### {cat.replace('_', ' ').title()} ({len(items)})", ""])
for pr in items:
refs_str = f" (refs: {', '.join(f'#{r}' for r in pr['refs'])})" if pr["refs"] else ""
lines.append(f"- #{pr['number']}: {pr['title'][:70]}{refs_str}")
return "\n".join(lines)
def format_json(analysis: dict) -> str:
"""Format as JSON."""
return json.dumps(analysis, indent=2, default=str)
def main():
parser = argparse.ArgumentParser(description="PR backlog triage for Gitea repos")
parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)")
parser.add_argument("--org", help="Triage all repos in org (instead of single repo)")
parser.add_argument("--close-stale", action="store_true", help="Close PRs referencing closed issues")
parser.add_argument("--dry-run", action="store_true", default=True, help="Don't actually close (default)")
parser.add_argument("--json", action="store_true", help="Output as JSON")
parser.add_argument("--output", help="Write report to file")
parser.add_argument("--token", help="Gitea token (overrides config file)")
args = parser.parse_args()
token = args.token or get_token()
# Determine repos to analyze
repos = []
if args.org:
org_repos = api_get(f"/orgs/{args.org}/repos", token, {"limit": "50"})
if org_repos:
repos = [r["full_name"] for r in org_repos]
elif args.repo:
repos = [args.repo]
else:
parser.error("Provide REPO or --org")
all_analyses = []
for repo in repos:
analysis = analyze_repo(repo, token)
if "error" in analysis:
print(f"SKIP: {analysis['error']}", file=sys.stderr)
continue
all_analyses.append(analysis)
# Close stale if requested
if args.close_stale and analysis["stale_prs"]:
actually_close = not args.dry_run
closed = close_stale_prs(analysis["stale_prs"], repo, token, dry_run=not actually_close)
analysis["closed_actions"] = closed
# Output
if args.json:
output = format_json(all_analyses[0] if len(all_analyses) == 1 else all_analyses)
else:
parts = [format_report(a) for a in all_analyses]
output = "\n\n---\n\n".join(parts)
if args.output:
Path(args.output).write_text(output, encoding="utf-8")
print(f"Report written to {args.output}")
else:
print(output)
# Exit 1 if any stale PRs found (CI mode)
total_stale = sum(len(a.get("stale_prs", [])) for a in all_analyses)
if total_stale > 0:
sys.exit(1)
if __name__ == "__main__":
main()