Merge PR #665: scripts/pr_triage.py

This commit is contained in:
Merge Bot
2026-04-16 05:10:46 +00:00
parent be49b46c4c
commit 5ac19b27ee

View File

@@ -1,362 +1,271 @@
#!/usr/bin/env python3
"""PR Triage Automation -- Categorize, deduplicate, report, auto-merge (#659).
Enhancements over base implementation:
- Auto-merge for safe PRs (training data with passing tests)
- --all-repos flag for org-wide triage
- JSON output with structured data
- Age-based risk scoring
- Better duplicate detection (title similarity)
- Tests in tests/test_pr_triage.py
"""
PR Triage Automation — Categorize, deduplicate, and report on open PRs.
Usage:
python scripts/pr_triage.py --repo hermes-agent
python scripts/pr_triage.py --repo hermes-agent --json
python scripts/pr_triage.py --repo hermes-agent --auto-merge --dry-run
python scripts/pr_triage.py --all-repos --owner Timmy_Foundation
python scripts/pr_triage.py # Generate report
python scripts/pr_triage.py --json # JSON output
python scripts/pr_triage.py --auto-merge # Auto-merge safe PRs
python scripts/pr_triage.py --repo timmy-home # Single repo
"""
import argparse
import json
import os
import re
import sys
import subprocess
from collections import Counter, defaultdict
from collections import Counter
from datetime import datetime, timezone
from difflib import SequenceMatcher
from urllib.request import Request, urlopen
from urllib.error import HTTPError
from pathlib import Path
from typing import Any, Optional
try:
import urllib.request
except ImportError:
print("Error: urllib not available")
sys.exit(1)
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1")
TOKEN_PATH = os.environ.get("GITEA_TOKEN_PATH", str(Path.home() / ".config/gitea/token"))
ORG = "Timmy_Foundation"
DEFAULT_REPOS = [
"timmy-home",
"hermes-agent",
"timmy-config",
"the-nexus",
"the-door",
"burn-fleet",
"second-son-of-timmy",
]
# ---------------------------------------------------------------------------
# Categories
# ---------------------------------------------------------------------------
CATEGORY_RULES = {
"training-data": [
r"training[- ]?data", r"scene[- ]?description", r"dpo", r"training",
r"batch[- ]?\d+", r"training[- ]?pipeline", r"jsonl",
],
"bug-fix": [
r"^fix[\(:]", r"\[BUG\]", r"\[FIX\]", r"bug fix", r"fixes #\d+",
r"closes #\d+", r"broken", r"crash", r"regression",
],
"feature": [
r"^feat[\(:]", r"\[FEAT\]", r"\[FEATURE\]", r"new feature",
r"add .+ support", r"implement",
],
"docs": [
r"^docs[\(:]", r"documentation", r"readme", r"genome",
],
"security": [
r"\[SECURITY\]", r"\[VITALIK\]", r"shield", r"injection",
r"vulnerability", r"hardening",
],
"infra": [
r"\[INFRA\]", r"deploy", r"ansible", r"docker", r"ci[/ ]cd",
r"cron", r"watchdog", r"systemd",
],
"research": [
r"research", r"benchmark", r"evaluation", r"analysis",
r"\[BIG-BRAIN\]", r"investigate",
],
"other": [], # fallback
}
def _token():
t = os.environ.get("GITEA_TOKEN", "")
if not t:
p = os.path.expanduser("~/.config/gitea/token")
if os.path.exists(p):
t = open(p).read().strip()
return t
def _api(url, token, method="GET", data=None):
h = {"Authorization": "token " + token, "Accept": "application/json"}
body = json.dumps(data).encode() if data else None
if data:
h["Content-Type"] = "application/json"
req = Request(url, data=body, headers=h, method=method)
try:
return json.loads(urlopen(req, timeout=30).read())
except HTTPError:
return None
def fetch_prs(base, token, owner, repo):
prs, page = [], 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/pulls?state=open&limit=50&page={page}", token)
if not b:
break
prs.extend(b)
if len(b) < 50:
break
page += 1
return prs
def fetch_issues(base, token, owner, repo):
iss, page = {}, 1
while True:
b = _api(f"{base}/api/v1/repos/{owner}/{repo}/issues?state=open&limit=50&page={page}", token)
if not b:
break
for i in b:
if "pull_request" not in i:
iss[i["number"]] = i
if len(b) < 50:
break
page += 1
return iss
def fetch_repos(base, token, owner):
repos, page = [], 1
while True:
b = _api(f"{base}/api/v1/orgs/{owner}/repos?limit=50&page={page}", token)
if not b:
break
repos.extend([r["name"] for r in b])
if len(b) < 50:
break
page += 1
return repos
def categorize(pr):
c = (pr.get("title", "") + " " + pr.get("body", "") + " " +
" ".join(l.get("name", "") for l in pr.get("labels", []))).lower()
for kw, cat in [
("training data", "training-data"), ("dpo", "training-data"), ("grpo", "training-data"),
("fix:", "bug-fix"), ("bug", "bug-fix"), ("hotfix", "bug-fix"),
("feat:", "feature"), ("feature", "feature"), ("enhancement", "feature"),
("refactor", "maintenance"), ("cleanup", "maintenance"), ("chore:", "maintenance"),
("doc", "documentation"), ("test", "testing"), ("ci", "infrastructure"),
("infra", "infrastructure"), ("deploy", "infrastructure"),
]:
if kw in c:
return cat
def categorize_pr(title: str, body: str) -> str:
"""Categorize a PR by its title and body."""
text = f"{title} {body}".lower()
for category, patterns in CATEGORY_RULES.items():
if category == "other":
continue
for pattern in patterns:
if re.search(pattern, text, re.IGNORECASE):
return category
return "other"
def refs(pr):
return [int(m) for m in re.findall(r"#(\d+)", pr.get("title", "") + " " + pr.get("body", ""))]
# ---------------------------------------------------------------------------
# Gitea API
# ---------------------------------------------------------------------------
def _load_token() -> str:
try:
return open(TOKEN_PATH).read().strip()
except FileNotFoundError:
print(f"Error: Token not found at {TOKEN_PATH}")
sys.exit(1)
def find_duplicates(prs):
by_ref = defaultdict(list)
for p in prs:
for r in refs(p):
by_ref[r].append(p)
by_title = defaultdict(list)
for p in prs:
# Normalize title for comparison
norm = re.sub(r"^(fix|feat|chore|docs|test|refactor)[\(:].*?[\):]\s*", "", p.get("title", "").lower())
norm = re.sub(r"#\d+", "", norm).strip()
by_title[norm].append(p)
dup_groups = []
seen = set()
# Ref-based duplicates
for r, group in by_ref.items():
if len(group) > 1:
key = tuple(sorted(p["number"] for p in group))
if key not in seen:
seen.add(key)
dup_groups.append({"type": "ref", "ref": r, "prs": group})
# Title-similarity duplicates (threshold 0.85)
for i, p1 in enumerate(prs):
for p2 in prs[i + 1:]:
key = tuple(sorted([p1["number"], p2["number"]]))
if key in seen:
continue
sim = SequenceMatcher(None, p1.get("title", "").lower(), p2.get("title", "").lower()).ratio()
if sim > 0.85:
seen.add(key)
dup_groups.append({"type": "similarity", "similarity": round(sim, 2), "prs": [p1, p2]})
return dup_groups
def api_get(path: str, token: str) -> Any:
req = urllib.request.Request(f"{GITEA_BASE}{path}")
req.add_header("Authorization", f"token {token}")
resp = urllib.request.urlopen(req, timeout=30)
return json.loads(resp.read())
def health(pr, issues):
r = refs(pr)
created = datetime.fromisoformat(pr["created_at"].replace("Z", "+00:00"))
updated = datetime.fromisoformat(pr["updated_at"].replace("Z", "+00:00"))
now = datetime.now(timezone.utc)
age_days = (now - created).days
stale_days = (now - updated).days
def get_open_prs(repo: str, token: str) -> list[dict]:
"""Fetch all open PRs for a repo."""
prs = []
page = 1
while True:
try:
batch = api_get(f"/repos/{ORG}/{repo}/pulls?state=open&limit=50&page={page}", token)
if not batch:
break
prs.extend(batch)
if len(batch) < 50:
break
page += 1
except Exception:
break
return prs
# Risk score: age + staleness + no refs + not mergeable
risk = 0
risk += min(age_days, 30) # max 30 for age
risk += min(stale_days * 2, 40) # max 40 for staleness
risk += 10 if not r else 0 # no issue refs
risk += 15 if pr.get("mergeable") is False else 0 # conflicts
risk = min(risk, 100)
def get_issue_state(repo: str, issue_num: int, token: str) -> Optional[str]:
"""Check if a referenced issue is still open."""
try:
issue = api_get(f"/repos/{ORG}/{repo}/issues/{issue_num}", token)
return issue.get("state", "unknown")
except Exception:
return None
def find_referenced_issues(pr_body: str, pr_title: str) -> list[int]:
"""Extract issue numbers referenced in PR body/title."""
text = f"{pr_title} {pr_body}"
return [int(m) for m in re.findall(r'#(\d+)', text)]
def find_duplicates(prs: list[dict]) -> list[tuple[dict, dict]]:
"""Find PRs that reference the same issue."""
issue_to_prs: dict[int, list[dict]] = {}
for pr in prs:
refs = find_referenced_issues(pr.get("body", ""), pr.get("title", ""))
for issue_num in refs:
issue_to_prs.setdefault(issue_num, []).append(pr)
duplicates = []
for issue_num, pr_list in issue_to_prs.items():
if len(pr_list) > 1:
# Pair up duplicates
for i in range(len(pr_list)):
for j in range(i + 1, len(pr_list)):
duplicates.append((pr_list[i], pr_list[j]))
return duplicates
# ---------------------------------------------------------------------------
# Triage
# ---------------------------------------------------------------------------
def triage_repo(repo: str, token: str) -> dict:
"""Triage all open PRs for a repo."""
prs = get_open_prs(repo, token)
categorized: dict[str, list[dict]] = {}
stale_issues = []
duplicates = find_duplicates(prs)
for pr in prs:
category = categorize_pr(pr.get("title", ""), pr.get("body", ""))
categorized.setdefault(category, []).append(pr)
# Check referenced issues
refs = find_referenced_issues(pr.get("body", ""), pr.get("title", ""))
for issue_num in refs:
state = get_issue_state(repo, issue_num, token)
if state == "closed":
stale_issues.append({"pr": pr["number"], "issue": issue_num, "repo": repo})
return {
"pr": pr["number"],
"title": pr["title"],
"head": pr["head"]["ref"],
"category": categorize(pr),
"refs": r,
"open_issues": [x for x in r if x in issues],
"closed_issues": [x for x in r if x not in issues],
"age_days": age_days,
"stale_days": stale_days,
"risk_score": risk,
"mergeable": pr.get("mergeable"),
"author": pr.get("user", {}).get("login", ""),
"labels": [l.get("name", "") for l in pr.get("labels", [])],
"repo": repo,
"total_prs": len(prs),
"by_category": {k: len(v) for k, v in categorized.items()},
"categorized": categorized,
"duplicates": [(a["number"], b["number"]) for a, b in duplicates],
"stale_issues": stale_issues,
}
def is_safe_to_merge(h):
"""Determine if a PR is safe to auto-merge."""
if h["category"] != "training-data":
return False, "not training-data"
if h["mergeable"] is False:
return False, "has conflicts"
if h["mergeable"] is None:
return False, "mergeable status unknown"
if h["stale_days"] > 30:
return False, f"too stale ({h['stale_days']}d)"
if h["risk_score"] > 50:
return False, f"risk too high ({h['risk_score']})"
return True, "safe"
def triage_all(repos: list[str], token: str) -> list[dict]:
"""Triage all repos."""
results = []
for repo in repos:
print(f" Triaging {repo}...", file=sys.stderr)
try:
result = triage_repo(repo, token)
results.append(result)
except Exception as e:
print(f" Error triaging {repo}: {e}", file=sys.stderr)
results.append({"repo": repo, "error": str(e)})
return results
def auto_merge(base, token, owner, repo, pr_num, dry_run=True):
"""Attempt to merge a PR."""
if dry_run:
return {"merged": False, "dry_run": True, "pr": pr_num}
# ---------------------------------------------------------------------------
# Report
# ---------------------------------------------------------------------------
url = f"{base}/api/v1/repos/{owner}/{repo}/pulls/{pr_num}/merge"
result = _api(url, token, method="POST", data={
"MergeTitleField": "auto",
"MergeMessageField": "auto",
"Do": "merge",
})
return {"merged": result is not None, "pr": pr_num, "result": result}
def generate_markdown_report(results: list[dict]) -> str:
"""Generate a markdown triage report."""
total_prs = sum(r.get("total_prs", 0) for r in results)
all_categories: Counter = Counter()
all_duplicates = []
all_stale = []
for r in results:
for cat, count in r.get("by_category", {}).items():
all_categories[cat] += count
all_duplicates.extend(r.get("duplicates", []))
all_stale.extend(r.get("stale_issues", []))
def report(repo, checks, dups):
lines = [
f"# PR Triage -- {repo}",
f"Generated: {datetime.now().strftime('%Y-%m-%d %H:%M')}",
f"Open PRs: {len(checks)}",
"# PR Triage Report",
"",
f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
"",
"## Summary",
"",
f"| Metric | Count |",
f"|--------|-------|",
f"| Total open PRs | {total_prs} |",
f"| Repos scanned | {len(results)} |",
f"| Duplicates found | {len(all_duplicates)} |",
f"| Stale (issue closed) | {len(all_stale)} |",
"",
"## By Category",
"",
"| Category | Count |",
"|----------|-------|",
]
cats = Counter(h["category"] for h in checks)
lines.append("| Category | Count |")
lines.append("|----------|-------|")
for c, n in cats.most_common():
lines.append(f"| {c} | {n} |")
for cat, count in all_categories.most_common():
lines.append(f"| {cat} | {count} |")
stale = [h for h in checks if h["stale_days"] > 7]
high_risk = [h for h in checks if h["risk_score"] > 50]
safe_merge = [h for h in checks if is_safe_to_merge(h)[0]]
if all_duplicates:
lines.extend(["", "## Duplicates (same issue referenced)", ""])
for a, b in all_duplicates:
lines.append(f"- PR #{a} and PR #{b}")
lines.extend([
"",
f"Stale (>7d): {len(stale)}",
f"High risk (>50): {len(high_risk)}",
f"Safe to merge: {len(safe_merge)}",
f"Duplicate groups: {len(dups)}",
"",
])
if all_stale:
lines.extend(["", "## Stale PRs (referenced issue is closed)", ""])
for s in all_stale:
lines.append(f"- {s['repo']} PR #{s['pr']} → issue #{s['issue']} (closed)")
if safe_merge:
lines.append("## Safe to Auto-Merge")
for h in safe_merge:
ok, reason = is_safe_to_merge(h)
lines.append(f"- #{h['pr']}: {h['title'][:60]} ({reason})")
lines.append("")
# Per-repo detail
for r in results:
if r.get("error"):
lines.extend(["", f"## {r['repo']} — ERROR", "", f"```{r['error']}```"])
continue
if dups:
lines.append("## Duplicates")
for g in dups:
pr_nums = [str(p["number"]) for p in g["prs"]]
lines.append(f"[{g['type']}] PRs {', '.join('#' + n for n in pr_nums)}:")
for p in g["prs"]:
lines.append(f" - #{p['number']}: {p['title']}")
lines.append("")
if stale:
lines.append("## Stale (>7d)")
for h in sorted(stale, key=lambda x: x["stale_days"], reverse=True):
lines.append(f"- #{h['pr']}: {h['title'][:50]} -- {h['stale_days']}d (risk: {h['risk_score']})")
lines.append("")
lines.append("## All PRs")
lines.append("| # | Title | Category | Age | Stale | Risk | Merge |")
lines.append("|---|-------|----------|-----|-------|------|-------|")
for h in sorted(checks, key=lambda x: x["pr"]):
m = "Y" if h["mergeable"] else ("N" if h["mergeable"] is False else "?")
s = f"{h['stale_days']}d" if h["stale_days"] > 7 else "-"
lines.append(f"| {h['pr']} | {h['title'][:45]} | {h['category']} | {h['age_days']}d | {s} | {h['risk_score']} | {m} |")
return "\n".join(lines)
def main():
p = argparse.ArgumentParser(description="PR Triage Automation")
p.add_argument("--base-url", default="https://forge.alexanderwhitestone.com")
p.add_argument("--owner", default="Timmy_Foundation")
p.add_argument("--repo", default="")
p.add_argument("--all-repos", action="store_true", help="Triage all org repos")
p.add_argument("--json", action="store_true", dest="js")
p.add_argument("--output", default="")
p.add_argument("--auto-merge", action="store_true", help="Auto-merge safe PRs")
p.add_argument("--dry-run", action="store_true", help="Show what would be merged without merging")
a = p.parse_args()
token = _token()
if not token:
print("No token"); sys.exit(1)
if a.all_repos:
repos = fetch_repos(a.base_url, token, a.owner)
all_checks = []
all_dups = []
for repo in repos:
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
for c in checks:
c["repo"] = repo
all_checks.extend(checks)
all_dups.extend(dups)
if a.js:
print(json.dumps({"repos": repos, "prs": all_checks, "duplicates_count": len(all_dups)}, indent=2))
else:
print(f"Org-wide triage: {len(repos)} repos, {len(all_checks)} PRs, {len(all_dups)} duplicate groups")
cats = Counter(h["category"] for h in all_checks)
for c, n in cats.most_common():
print(f" {c}: {n}")
return
repo = a.repo
if not repo:
try:
remote = subprocess.check_output(["git", "remote", "get-url", "origin"], text=True).strip()
m = re.search(r"[/:](\w[\w-]*)/(\w[\w-]*?)(?:\.git)?$", remote)
if m:
a.owner, repo = m.group(1), m.group(2)
except Exception:
pass
if not repo:
print("No repo specified"); sys.exit(1)
print(f"Triaging {a.owner}/{repo}...", file=sys.stderr)
prs = fetch_prs(a.base_url, token, a.owner, repo)
issues = fetch_issues(a.base_url, token, a.owner, repo)
checks = [health(pr, issues) for pr in prs]
dups = find_duplicates(prs)
# Auto-merge
merge_results = []
if a.auto_merge or a.dry_run:
safe = [h for h in checks if is_safe_to_merge(h)[0]]
if safe:
print(f"Auto-merge: {len(safe)} safe PRs ({'dry-run' if a.dry_run else 'live'})", file=sys.stderr)
for h in safe:
result = auto_merge(a.base_url, token, a.owner, repo, h["pr"], dry_run=a.dry_run)
merge_results.append(result)
status = "WOULD MERGE" if a.dry_run else ("MERGED" if result["merged"] else "FAILED")
print(f" #{h['pr']}: {status}", file=sys.stderr)
if a.js:
out = {
"repo": repo, "prs": checks,
"duplicates": [{"type": g["type"], "prs": [p["number"] for p in g["prs"]]} for g in dups],
"merge_results": merge_results,
}
print(json.dumps(out, indent=2))
else:
r = report(repo, checks, dups)
print(r)
if a.output:
with open(a.output, "w") as f:
f.write(r)
print(f"\n{len(checks)} PRs, {len(dups)} duplicate groups, {len(merge_results)} merges",
file=sys.stderr)
if __name__ == "__main__":
main()
lines.extend([f"", f"## {r['repo']} ({r.get('total_prs', 0)} open PRs)", ""])
for cat, prs in r.get("categorized", {}).items():
if not prs:
continue
lines.append(f"