From 0fcef1839e0b74cdc25cf7aa08902d98fae755f2 Mon Sep 17 00:00:00 2001 From: Alexander Whitestone Date: Tue, 14 Apr 2026 22:20:06 -0400 Subject: [PATCH] feat: PR triage automation script (#659) - scripts/pr_triage.py: fetch, categorize, deduplicate, report - Categories: training-data, bug-fix, feature, maintenance, documentation, testing, infrastructure - Duplicate detection: PRs referencing same issue - Health checks: stale (>7d), closed issue refs, mergeable status - Markdown report + JSON output - 8 tests pass --- scripts/pr_triage.py | 176 ++++++++++++++++++++++++++++++++++++++++ tests/test_pr_triage.py | 45 ++++++++++ 2 files changed, 221 insertions(+) create mode 100755 scripts/pr_triage.py create mode 100644 tests/test_pr_triage.py diff --git a/scripts/pr_triage.py b/scripts/pr_triage.py new file mode 100755 index 00000000..76cee8b3 --- /dev/null +++ b/scripts/pr_triage.py @@ -0,0 +1,176 @@ +#!/usr/bin/env python3 +"""PR Triage Automation -- Categorize, deduplicate, report (#659).""" +import argparse, json, os, re, sys, subprocess +from collections import Counter, defaultdict +from datetime import datetime +from urllib.request import Request, urlopen +from urllib.error import HTTPError + + +def _token(): + t = os.environ.get("GITEA_TOKEN", "") + if not t: + p = os.path.expanduser("~/.config/gitea/token") + if os.path.exists(p): + t = open(p).read().strip() + return t + + +def _api(url, token, method="GET", data=None): + h = {"Authorization": "token " + token, "Accept": "application/json"} + body = json.dumps(data).encode() if data else None + if data: + h["Content-Type"] = "application/json" + req = Request(url, data=body, headers=h, method=method) + try: + return json.loads(urlopen(req, timeout=30).read()) + except HTTPError: + return None + + +def fetch_prs(base, token, owner, repo): + prs, page = [], 1 + while True: + b = _api(base + "/api/v1/repos/" + owner + "/" + repo + "/pulls?state=open&limit=50&page=" + str(page), token) + if not b: + break + prs.extend(b) + if len(b) < 50: + break + page += 1 + return prs + + +def fetch_issues(base, token, owner, repo): + iss, page = {}, 1 + while True: + b = _api(base + "/api/v1/repos/" + owner + "/" + repo + "/issues?state=open&limit=50&page=" + str(page), token) + if not b: + break + for i in b: + if "pull_request" not in i: + iss[i["number"]] = i + if len(b) < 50: + break + page += 1 + return iss + + +def categorize(pr): + c = (pr.get("title", "") + " " + pr.get("body", "") + " " + " ".join(l.get("name", "") for l in pr.get("labels", []))).lower() + for kw, cat in [("training data", "training-data"), ("dpo", "training-data"), ("grpo", "training-data"), + ("fix:", "bug-fix"), ("bug", "bug-fix"), ("hotfix", "bug-fix"), + ("feat:", "feature"), ("feature", "feature"), + ("refactor", "maintenance"), ("cleanup", "maintenance"), + ("doc", "documentation"), ("test", "testing"), ("infra", "infrastructure")]: + if kw in c: + return cat + return "other" + + +def refs(pr): + return [int(m) for m in re.findall(r"#(\d+)", pr.get("title", "") + " " + pr.get("body", ""))] + + +def find_duplicates(prs): + by = defaultdict(list) + for p in prs: + for r in refs(p): + by[r].append(p) + return [g for g in by.values() if len(g) > 1] + + +def health(pr, issues): + r = refs(pr) + created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00")) + updated = datetime.fromisoformat(pr["updated_at"].replace("Z", "+00:00")) + now = datetime.now(created.tzinfo) + return { + "pr": pr["number"], "title": pr["title"], "head": pr["head"]["ref"], + "category": categorize(pr), "refs": r, + "open": [x for x in r if x in issues], "closed": [x for x in r if x not in issues], + "age": (now - created).days, "stale": (now - updated).days, + "mergeable": pr.get("mergeable"), "author": pr.get("user", {}).get("login", ""), + } + + +def report(repo, checks, dups): + lines = ["# PR Triage -- " + repo, + "Generated: " + datetime.now().strftime("%Y-%m-%d %H:%M"), + "Open PRs: " + str(len(checks)), "", "## Summary", ""] + cats = Counter(h["category"] for h in checks) + lines.append("| Category | Count |") + lines.append("|----------|-------|") + for c, n in cats.most_common(): + lines.append("| " + c + " | " + str(n) + " |") + stale = [h for h in checks if h["stale"] > 7] + lines.extend(["", "Stale (>7d): " + str(len(stale)), + "Duplicate groups: " + str(len(dups)), ""]) + if dups: + lines.append("## Duplicates") + for g in dups: + rs = set() + for p in g: + rs.update(refs(p)) + lines.append("Issues " + ", ".join("#" + str(r) for r in sorted(rs)) + ":") + for p in g: + lines.append(" - #" + str(p["number"]) + ": " + p["title"]) + lines.append("") + if stale: + lines.append("## Stale (>7d)") + for h in sorted(stale, key=lambda x: x["stale"], reverse=True): + lines.append("- #" + str(h["pr"]) + ": " + h["title"] + " -- " + str(h["stale"]) + "d") + lines.append("") + lines.append("## All PRs") + lines.append("| # | Title | Category | Age | Stale | Merge |") + lines.append("|---|-------|----------|-----|-------|-------|") + for h in sorted(checks, key=lambda x: x["pr"]): + m = "Y" if h["mergeable"] else ("N" if h["mergeable"] is False else "?") + s = str(h["stale"]) + "d" if h["stale"] > 7 else "-" + lines.append("| " + str(h["pr"]) + " | " + h["title"][:50] + " | " + h["category"] + + " | " + str(h["age"]) + "d | " + s + " | " + m + " |") + return chr(10).join(lines) + + +def main(): + p = argparse.ArgumentParser(description="PR Triage Automation") + p.add_argument("--base-url", default="https://forge.alexanderwhitestone.com") + p.add_argument("--owner", default="Timmy_Foundation") + p.add_argument("--repo", default="") + p.add_argument("--json", action="store_true", dest="js") + p.add_argument("--output", default="") + a = p.parse_args() + token = _token() + if not token: + print("No token"); sys.exit(1) + repo = a.repo + if not repo: + try: + remote = subprocess.check_output(["git", "remote", "get-url", "origin"], text=True).strip() + m = re.search(r"[/:](\w[\w-]*)/(\w[\w-]*?)(?:\.git)?$", remote) + if m: + a.owner, repo = m.group(1), m.group(2) + except Exception: + pass + if not repo: + print("No repo specified"); sys.exit(1) + print("Triaging " + a.owner + "/" + repo + "...", file=sys.stderr) + prs = fetch_prs(a.base_url, token, a.owner, repo) + issues = fetch_issues(a.base_url, token, a.owner, repo) + checks = [health(pr, issues) for pr in prs] + dups = find_duplicates(prs) + if a.js: + print(json.dumps({"repo": repo, "prs": checks, + "duplicates": [[{"number": p["number"], "title": p["title"]} for p in g] for g in dups]}, + indent=2)) + else: + r = report(repo, checks, dups) + print(r) + if a.output: + with open(a.output, "w") as f: + f.write(r) + print("\n" + str(len(checks)) + " PRs, " + str(len(dups)) + " duplicate groups", file=sys.stderr) + + +if __name__ == "__main__": + main() diff --git a/tests/test_pr_triage.py b/tests/test_pr_triage.py new file mode 100644 index 00000000..c33cdd43 --- /dev/null +++ b/tests/test_pr_triage.py @@ -0,0 +1,45 @@ +"""Tests for PR triage automation (#659).""" +import pytest + +class TestCategorize: + def _pr(self, title="", body=""): + return {"title": title, "body": body, "labels": []} + + def test_training(self): + from scripts.pr_triage import categorize + assert categorize(self._pr("Add DPO pairs")) == "training-data" + + def test_bug(self): + from scripts.pr_triage import categorize + assert categorize(self._pr("fix: crash")) == "bug-fix" + + def test_feature(self): + from scripts.pr_triage import categorize + assert categorize(self._pr("feat: dark mode")) == "feature" + + def test_other(self): + from scripts.pr_triage import categorize + assert categorize(self._pr("random")) == "other" + +class TestRefs: + def test_simple(self): + from scripts.pr_triage import refs + assert 123 in refs({"title": "Fix #123", "body": ""}) + + def test_multiple(self): + from scripts.pr_triage import refs + r = refs({"title": "", "body": "Closes #100, Refs #200"}) + assert 100 in r and 200 in r + +class TestDuplicates: + def test_found(self): + from scripts.pr_triage import find_duplicates + prs = [{"title": "", "body": "Fix #1", "number": 1, "head": {"ref": "a"}, "created_at": "2026-01-01T00:00:00Z", "updated_at": "2026-01-01T00:00:00Z", "user": {}}, + {"title": "", "body": "Refs #1", "number": 2, "head": {"ref": "b"}, "created_at": "2026-01-01T00:00:00Z", "updated_at": "2026-01-01T00:00:00Z", "user": {}}] + assert len(find_duplicates(prs)) == 1 + + def test_none(self): + from scripts.pr_triage import find_duplicates + prs = [{"title": "", "body": "Fix #1", "number": 1, "head": {"ref": "a"}, "created_at": "2026-01-01T00:00:00Z", "updated_at": "2026-01-01T00:00:00Z", "user": {}}, + {"title": "", "body": "Fix #2", "number": 2, "head": {"ref": "b"}, "created_at": "2026-01-01T00:00:00Z", "updated_at": "2026-01-01T00:00:00Z", "user": {}}] + assert find_duplicates(prs) == []