Compare commits
2 Commits
feat/647-s
...
burn/658-p
| Author | SHA1 | Date | |
|---|---|---|---|
| f05c014143 | |||
| f094b0d5b5 |
290
scripts/pr_backlog_triage.py
Normal file
290
scripts/pr_backlog_triage.py
Normal file
@@ -0,0 +1,290 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
pr_backlog_triage.py — Automated PR backlog analysis for Gitea repos (Issue #658).
|
||||
|
||||
Analyzes open PRs: categorizes, finds duplicates, detects stale references
|
||||
to closed issues, and generates a triage report.
|
||||
|
||||
Usage:
|
||||
python3 scripts/pr_backlog_triage.py Timmy_Foundation/timmy-config
|
||||
python3 scripts/pr_backlog_triage.py Timmy_Foundation/hermes-agent --close-stale
|
||||
python3 scripts/pr_backlog_triage.py Timmy_Foundation/the-nexus --json
|
||||
python3 scripts/pr_backlog_triage.py --org Timmy_Foundation # All repos
|
||||
"""
|
||||
import argparse
|
||||
import json
|
||||
import os
|
||||
import re
|
||||
import sys
|
||||
from datetime import datetime, timezone
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
from urllib.request import Request, urlopen
|
||||
from urllib.error import HTTPError
|
||||
|
||||
GITEA_URL = "https://forge.alexanderwhitestone.com"
|
||||
ISSUE_PATTERN = re.compile(r"#(\d+)")
|
||||
CATEGORY_KEYWORDS = {
|
||||
"training_data": ["500", "pairs", "scene description", "lyrics", "prompt", "training data"],
|
||||
"bug_fix": ["fix", "bug", "patch", "hotfix", "resolve"],
|
||||
"feature": ["feat", "add", "implement", "feature"],
|
||||
"docs": ["doc", "readme", "changelog"],
|
||||
"ops": ["ops", "deploy", "ci", "cd", "pipeline"],
|
||||
}
|
||||
|
||||
|
||||
def get_token() -> str:
|
||||
"""Read Gitea token from config."""
|
||||
path = Path(os.path.expanduser("~/.config/gitea/token"))
|
||||
if path.exists():
|
||||
return path.read_text().strip()
|
||||
token = os.environ.get("GITEA_TOKEN", "")
|
||||
if not token:
|
||||
print("ERROR: No Gitea token found. Set GITEA_TOKEN or create ~/.config/gitea/token", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
return token
|
||||
|
||||
|
||||
def api_get(path: str, token: str, params: dict = None) -> Any:
|
||||
"""GET from Gitea API."""
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
if params:
|
||||
qs = "&".join(f"{k}={v}" for k, v in params.items())
|
||||
url = f"{url}?{qs}"
|
||||
req = Request(url, headers={"Authorization": f"token {token}"})
|
||||
try:
|
||||
return json.loads(urlopen(req, timeout=30).read())
|
||||
except HTTPError as e:
|
||||
if e.code == 404:
|
||||
return None
|
||||
raise
|
||||
|
||||
|
||||
def api_patch(path: str, token: str, data: dict) -> Any:
|
||||
"""PATCH to Gitea API."""
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
body = json.dumps(data).encode()
|
||||
req = Request(url, data=body, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
}, method="PATCH")
|
||||
try:
|
||||
return json.loads(urlopen(req, timeout=30).read())
|
||||
except HTTPError:
|
||||
return None
|
||||
|
||||
|
||||
def api_post(path: str, token: str, data: dict) -> Any:
|
||||
"""POST to Gitea API."""
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
body = json.dumps(data).encode()
|
||||
req = Request(url, data=body, headers={
|
||||
"Authorization": f"token {token}",
|
||||
"Content-Type": "application/json",
|
||||
}, method="POST")
|
||||
try:
|
||||
return json.loads(urlopen(req, timeout=30).read())
|
||||
except HTTPError:
|
||||
return None
|
||||
|
||||
|
||||
def categorize_pr(pr: dict) -> str:
|
||||
"""Categorize a PR by title keywords."""
|
||||
title = (pr.get("title") or "").lower()
|
||||
for category, keywords in CATEGORY_KEYWORDS.items():
|
||||
if any(kw in title for kw in keywords):
|
||||
return category
|
||||
return "other"
|
||||
|
||||
|
||||
def extract_refs(pr: dict) -> List[int]:
|
||||
"""Extract issue numbers referenced in PR title and body."""
|
||||
text = ((pr.get("title") or "") + " " + (pr.get("body") or ""))
|
||||
return sorted(set(int(n) for n in ISSUE_PATTERN.findall(text)))
|
||||
|
||||
|
||||
def find_duplicates(prs: List[dict]) -> Dict[int, List[int]]:
|
||||
"""Find PRs that reference the same issue."""
|
||||
issue_to_prs: Dict[int, List[int]] = {}
|
||||
for pr in prs:
|
||||
for ref in extract_refs(pr):
|
||||
issue_to_prs.setdefault(ref, []).append(pr["number"])
|
||||
return {k: v for k, v in issue_to_prs.items() if len(v) > 1}
|
||||
|
||||
|
||||
def find_stale(prs: List[dict], closed_issue_nums: set) -> List[dict]:
|
||||
"""Find PRs referencing closed issues."""
|
||||
stale = []
|
||||
for pr in prs:
|
||||
refs = extract_refs(pr)
|
||||
closed_refs = [r for r in refs if r in closed_issue_nums]
|
||||
if closed_refs:
|
||||
stale.append({"pr": pr["number"], "closed_refs": closed_refs})
|
||||
return stale
|
||||
|
||||
|
||||
def analyze_repo(repo: str, token: str, limit: int = 100) -> dict:
|
||||
"""Full triage analysis for a repo."""
|
||||
# Fetch open PRs
|
||||
prs = api_get(f"/repos/{repo}/pulls", token, {"state": "open", "limit": str(limit)})
|
||||
if prs is None:
|
||||
return {"error": f"Could not fetch PRs for {repo}"}
|
||||
|
||||
# Fetch closed issues for stale detection
|
||||
closed_issues = api_get(f"/repos/{repo}/issues", token, {"state": "closed", "limit": "200"})
|
||||
closed_nums = set()
|
||||
if closed_issues:
|
||||
closed_nums = {iss["number"] for iss in closed_issues if not iss.get("pull_request")}
|
||||
|
||||
# Categorize
|
||||
categories: Dict[str, List[dict]] = {}
|
||||
for pr in prs:
|
||||
cat = categorize_pr(pr)
|
||||
categories.setdefault(cat, []).append({
|
||||
"number": pr["number"],
|
||||
"title": pr.get("title", ""),
|
||||
"head": pr.get("head", {}).get("ref", ""),
|
||||
"refs": extract_refs(pr),
|
||||
"additions": pr.get("additions", 0),
|
||||
"deletions": pr.get("deletions", 0),
|
||||
"changed_files": pr.get("changed_files", 0),
|
||||
"created": pr.get("created_at", ""),
|
||||
})
|
||||
|
||||
duplicates = find_duplicates(prs)
|
||||
stale = find_stale(prs, closed_nums)
|
||||
|
||||
return {
|
||||
"repo": repo,
|
||||
"total_open": len(prs),
|
||||
"categories": {k: len(v) for k, v in categories.items()},
|
||||
"category_details": categories,
|
||||
"duplicates": duplicates,
|
||||
"stale_prs": stale,
|
||||
"closed_issues_checked": len(closed_nums),
|
||||
}
|
||||
|
||||
|
||||
def close_stale_prs(stale: List[dict], repo: str, token: str, dry_run: bool = True) -> List[dict]:
|
||||
"""Close PRs that reference closed issues."""
|
||||
closed = []
|
||||
for item in stale:
|
||||
pr_num = item["pr"]
|
||||
refs = item["closed_refs"]
|
||||
if dry_run:
|
||||
closed.append({"pr": pr_num, "action": "would_close", "refs": refs})
|
||||
continue
|
||||
# Comment explaining closure
|
||||
api_post(f"/repos/{repo}/issues/{pr_num}/comments", token, {
|
||||
"body": f"Closing: references closed issue(s) {', '.join(f'#{r}' for r in refs)}. Triage cleanup."
|
||||
})
|
||||
# Close the PR
|
||||
api_patch(f"/repos/{repo}/pulls/{pr_num}", token, {"state": "closed"})
|
||||
closed.append({"pr": pr_num, "action": "closed", "refs": refs})
|
||||
return closed
|
||||
|
||||
|
||||
def format_report(analysis: dict) -> str:
|
||||
"""Format triage analysis as markdown report."""
|
||||
lines = [
|
||||
f"## PR Backlog Triage — {analysis['repo']}",
|
||||
f"",
|
||||
f"**Total open PRs:** {analysis['total_open']}",
|
||||
f"**Closed issues checked:** {analysis['closed_issues_checked']}",
|
||||
"",
|
||||
"### Categories",
|
||||
"",
|
||||
"| Category | Count |",
|
||||
"|----------|-------|",
|
||||
]
|
||||
for cat, count in sorted(analysis["categories"].items()):
|
||||
lines.append(f"| {cat} | {count} |")
|
||||
|
||||
# Duplicates
|
||||
if analysis["duplicates"]:
|
||||
lines.extend(["", "### Duplicate PRs (same issue referenced)", ""])
|
||||
for issue, pr_nums in analysis["duplicates"].items():
|
||||
lines.append(f"- Issue #{issue}: PRs {pr_nums}")
|
||||
|
||||
# Stale
|
||||
if analysis["stale_prs"]:
|
||||
lines.extend(["", "### Stale PRs (reference closed issues)", ""])
|
||||
for item in analysis["stale_prs"]:
|
||||
lines.append(f"- PR #{item['pr']}: references closed {', '.join(f'#{r}' for r in item['closed_refs'])}")
|
||||
|
||||
# Details per category
|
||||
for cat, items in analysis.get("category_details", {}).items():
|
||||
if not items:
|
||||
continue
|
||||
lines.extend([f"", f"### {cat.replace('_', ' ').title()} ({len(items)})", ""])
|
||||
for pr in items:
|
||||
refs_str = f" (refs: {', '.join(f'#{r}' for r in pr['refs'])})" if pr["refs"] else ""
|
||||
lines.append(f"- #{pr['number']}: {pr['title'][:70]}{refs_str}")
|
||||
|
||||
return "\n".join(lines)
|
||||
|
||||
|
||||
def format_json(analysis: dict) -> str:
|
||||
"""Format as JSON."""
|
||||
return json.dumps(analysis, indent=2, default=str)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="PR backlog triage for Gitea repos")
|
||||
parser.add_argument("repo", nargs="?", help="Repo path (e.g. Timmy_Foundation/timmy-config)")
|
||||
parser.add_argument("--org", help="Triage all repos in org (instead of single repo)")
|
||||
parser.add_argument("--close-stale", action="store_true", help="Close PRs referencing closed issues")
|
||||
parser.add_argument("--dry-run", action="store_true", default=True, help="Don't actually close (default)")
|
||||
parser.add_argument("--json", action="store_true", help="Output as JSON")
|
||||
parser.add_argument("--output", help="Write report to file")
|
||||
parser.add_argument("--token", help="Gitea token (overrides config file)")
|
||||
args = parser.parse_args()
|
||||
|
||||
token = args.token or get_token()
|
||||
|
||||
# Determine repos to analyze
|
||||
repos = []
|
||||
if args.org:
|
||||
org_repos = api_get(f"/orgs/{args.org}/repos", token, {"limit": "50"})
|
||||
if org_repos:
|
||||
repos = [r["full_name"] for r in org_repos]
|
||||
elif args.repo:
|
||||
repos = [args.repo]
|
||||
else:
|
||||
parser.error("Provide REPO or --org")
|
||||
|
||||
all_analyses = []
|
||||
for repo in repos:
|
||||
analysis = analyze_repo(repo, token)
|
||||
if "error" in analysis:
|
||||
print(f"SKIP: {analysis['error']}", file=sys.stderr)
|
||||
continue
|
||||
all_analyses.append(analysis)
|
||||
|
||||
# Close stale if requested
|
||||
if args.close_stale and analysis["stale_prs"]:
|
||||
actually_close = not args.dry_run
|
||||
closed = close_stale_prs(analysis["stale_prs"], repo, token, dry_run=not actually_close)
|
||||
analysis["closed_actions"] = closed
|
||||
|
||||
# Output
|
||||
if args.json:
|
||||
output = format_json(all_analyses[0] if len(all_analyses) == 1 else all_analyses)
|
||||
else:
|
||||
parts = [format_report(a) for a in all_analyses]
|
||||
output = "\n\n---\n\n".join(parts)
|
||||
|
||||
if args.output:
|
||||
Path(args.output).write_text(output, encoding="utf-8")
|
||||
print(f"Report written to {args.output}")
|
||||
else:
|
||||
print(output)
|
||||
|
||||
# Exit 1 if any stale PRs found (CI mode)
|
||||
total_stale = sum(len(a.get("stale_prs", [])) for a in all_analyses)
|
||||
if total_stale > 0:
|
||||
sys.exit(1)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
188
tests/test_pr_backlog_triage.py
Normal file
188
tests/test_pr_backlog_triage.py
Normal file
@@ -0,0 +1,188 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Tests for pr_backlog_triage.py — issue #658."""
|
||||
import json
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
sys.path.insert(0, str(Path(__file__).resolve().parent.parent / "scripts"))
|
||||
from pr_backlog_triage import (
|
||||
categorize_pr,
|
||||
extract_refs,
|
||||
find_duplicates,
|
||||
find_stale,
|
||||
format_report,
|
||||
format_json,
|
||||
)
|
||||
|
||||
|
||||
class TestCategorize:
|
||||
def test_training_data(self):
|
||||
pr = {"title": "feat: 500 emotional weather pairs (#603)"}
|
||||
assert categorize_pr(pr) == "training_data"
|
||||
|
||||
def test_scene_description(self):
|
||||
pr = {"title": "feat: 100 jazz scene descriptions (#612)"}
|
||||
assert categorize_pr(pr) == "training_data"
|
||||
|
||||
def test_bug_fix(self):
|
||||
pr = {"title": "fix: broken import in cli.py"}
|
||||
assert categorize_pr(pr) == "bug_fix"
|
||||
|
||||
def test_feature(self):
|
||||
pr = {"title": "feat: add token budget tracker"}
|
||||
assert categorize_pr(pr) == "feature"
|
||||
|
||||
def test_docs(self):
|
||||
pr = {"title": "docs: update README with new config format"}
|
||||
assert categorize_pr(pr) == "docs"
|
||||
|
||||
def test_ops(self):
|
||||
pr = {"title": "ops: deploy config to Ezra VPS"}
|
||||
assert categorize_pr(pr) == "ops"
|
||||
|
||||
def test_other(self):
|
||||
pr = {"title": "chore: cleanup whitespace"}
|
||||
assert categorize_pr(pr) == "other"
|
||||
|
||||
def test_case_insensitive(self):
|
||||
pr = {"title": "FIX: resolve import error"}
|
||||
assert categorize_pr(pr) == "bug_fix"
|
||||
|
||||
def test_empty_title(self):
|
||||
pr = {"title": ""}
|
||||
assert categorize_pr(pr) == "other"
|
||||
|
||||
def test_none_title(self):
|
||||
pr = {}
|
||||
assert categorize_pr(pr) == "other"
|
||||
|
||||
|
||||
class TestExtractRefs:
|
||||
def test_single_ref(self):
|
||||
pr = {"title": "Fix #123", "body": "Closes #123"}
|
||||
assert extract_refs(pr) == [123]
|
||||
|
||||
def test_multiple_refs(self):
|
||||
pr = {"title": "Fix #123", "body": "Related to #456 and #789"}
|
||||
assert extract_refs(pr) == [123, 456, 789]
|
||||
|
||||
def test_deduplication(self):
|
||||
pr = {"title": "#100", "body": "Fixes #100"}
|
||||
assert extract_refs(pr) == [100]
|
||||
|
||||
def test_no_refs(self):
|
||||
pr = {"title": "No issue here", "body": "Just a PR"}
|
||||
assert extract_refs(pr) == []
|
||||
|
||||
def test_empty_body(self):
|
||||
pr = {"title": "Fix #42", "body": None}
|
||||
assert extract_refs(pr) == [42]
|
||||
|
||||
|
||||
class TestFindDuplicates:
|
||||
def test_no_duplicates(self):
|
||||
prs = [
|
||||
{"number": 1, "title": "Fix #10", "body": ""},
|
||||
{"number": 2, "title": "Fix #11", "body": ""},
|
||||
]
|
||||
assert find_duplicates(prs) == {}
|
||||
|
||||
def test_duplicates_found(self):
|
||||
prs = [
|
||||
{"number": 1, "title": "Fix #10", "body": ""},
|
||||
{"number": 2, "title": "Also fix #10", "body": ""},
|
||||
]
|
||||
dupes = find_duplicates(prs)
|
||||
assert 10 in dupes
|
||||
assert dupes[10] == [1, 2]
|
||||
|
||||
def test_triple_duplicate(self):
|
||||
prs = [
|
||||
{"number": 1, "title": "#42", "body": ""},
|
||||
{"number": 2, "title": "#42", "body": ""},
|
||||
{"number": 3, "title": "#42", "body": ""},
|
||||
]
|
||||
dupes = find_duplicates(prs)
|
||||
assert len(dupes[42]) == 3
|
||||
|
||||
|
||||
class TestFindStale:
|
||||
def test_no_stale(self):
|
||||
prs = [{"number": 1, "title": "Fix #10", "body": ""}]
|
||||
closed = set()
|
||||
assert find_stale(prs, closed) == []
|
||||
|
||||
def test_stale_found(self):
|
||||
prs = [{"number": 1, "title": "Fix #10", "body": ""}]
|
||||
closed = {10}
|
||||
stale = find_stale(prs, closed)
|
||||
assert len(stale) == 1
|
||||
assert stale[0]["pr"] == 1
|
||||
assert stale[0]["closed_refs"] == [10]
|
||||
|
||||
def test_mixed_refs(self):
|
||||
prs = [{"number": 1, "title": "Fix #10 and #20", "body": ""}]
|
||||
closed = {10}
|
||||
stale = find_stale(prs, closed)
|
||||
assert stale[0]["closed_refs"] == [10]
|
||||
|
||||
|
||||
class TestFormatReport:
|
||||
def test_basic_report(self):
|
||||
analysis = {
|
||||
"repo": "test/repo",
|
||||
"total_open": 5,
|
||||
"categories": {"feature": 3, "bug_fix": 2},
|
||||
"category_details": {
|
||||
"feature": [
|
||||
{"number": 1, "title": "feat: x", "refs": [10], "head": "f1",
|
||||
"additions": 10, "deletions": 5, "changed_files": 2, "created": "2026-04-01"}
|
||||
],
|
||||
"bug_fix": [],
|
||||
},
|
||||
"duplicates": {},
|
||||
"stale_prs": [],
|
||||
"closed_issues_checked": 100,
|
||||
}
|
||||
report = format_report(analysis)
|
||||
assert "test/repo" in report
|
||||
assert "5" in report
|
||||
assert "feature" in report
|
||||
|
||||
def test_stale_in_report(self):
|
||||
analysis = {
|
||||
"repo": "test/repo",
|
||||
"total_open": 1,
|
||||
"categories": {"feature": 1},
|
||||
"category_details": {},
|
||||
"duplicates": {},
|
||||
"stale_prs": [{"pr": 5, "closed_refs": [10]}],
|
||||
"closed_issues_checked": 50,
|
||||
}
|
||||
report = format_report(analysis)
|
||||
assert "#5" in report
|
||||
assert "#10" in report
|
||||
|
||||
def test_duplicates_in_report(self):
|
||||
analysis = {
|
||||
"repo": "test/repo",
|
||||
"total_open": 2,
|
||||
"categories": {"bug_fix": 2},
|
||||
"category_details": {},
|
||||
"duplicates": {42: [1, 2]},
|
||||
"stale_prs": [],
|
||||
"closed_issues_checked": 0,
|
||||
}
|
||||
report = format_report(analysis)
|
||||
assert "Duplicate" in report
|
||||
assert "#42" in report
|
||||
|
||||
|
||||
class TestFormatJson:
|
||||
def test_valid_json(self):
|
||||
analysis = {"repo": "test", "total_open": 0}
|
||||
out = format_json(analysis)
|
||||
parsed = json.loads(out)
|
||||
assert parsed["repo"] == "test"
|
||||
Reference in New Issue
Block a user