diff --git a/scripts/pr_triage.py b/scripts/pr_triage.py new file mode 100644 index 00000000..c3d20fa1 --- /dev/null +++ b/scripts/pr_triage.py @@ -0,0 +1,271 @@ +#!/usr/bin/env python3 +""" +PR Triage Automation — Categorize, deduplicate, and report on open PRs. + +Usage: + python scripts/pr_triage.py # Generate report + python scripts/pr_triage.py --json # JSON output + python scripts/pr_triage.py --auto-merge # Auto-merge safe PRs + python scripts/pr_triage.py --repo timmy-home # Single repo +""" + +import json +import os +import re +import sys +from collections import Counter +from datetime import datetime, timezone +from pathlib import Path +from typing import Any, Optional + +try: + import urllib.request +except ImportError: + print("Error: urllib not available") + sys.exit(1) + +# --------------------------------------------------------------------------- +# Config +# --------------------------------------------------------------------------- + +GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1") +TOKEN_PATH = os.environ.get("GITEA_TOKEN_PATH", str(Path.home() / ".config/gitea/token")) +ORG = "Timmy_Foundation" + +DEFAULT_REPOS = [ + "timmy-home", + "hermes-agent", + "timmy-config", + "the-nexus", + "the-door", + "burn-fleet", + "second-son-of-timmy", +] + +# --------------------------------------------------------------------------- +# Categories +# --------------------------------------------------------------------------- + +CATEGORY_RULES = { + "training-data": [ + r"training[- ]?data", r"scene[- ]?description", r"dpo", r"training", + r"batch[- ]?\d+", r"training[- ]?pipeline", r"jsonl", + ], + "bug-fix": [ + r"^fix[\(:]", r"\[BUG\]", r"\[FIX\]", r"bug fix", r"fixes #\d+", + r"closes #\d+", r"broken", r"crash", r"regression", + ], + "feature": [ + r"^feat[\(:]", r"\[FEAT\]", r"\[FEATURE\]", r"new feature", + r"add .+ support", r"implement", + ], + "docs": [ + r"^docs[\(:]", r"documentation", r"readme", r"genome", + ], + "security": [ + r"\[SECURITY\]", r"\[VITALIK\]", r"shield", r"injection", + r"vulnerability", r"hardening", + ], + "infra": [ + r"\[INFRA\]", r"deploy", r"ansible", r"docker", r"ci[/ ]cd", + r"cron", r"watchdog", r"systemd", + ], + "research": [ + r"research", r"benchmark", r"evaluation", r"analysis", + r"\[BIG-BRAIN\]", r"investigate", + ], + "other": [], # fallback +} + + +def categorize_pr(title: str, body: str) -> str: + """Categorize a PR by its title and body.""" + text = f"{title} {body}".lower() + for category, patterns in CATEGORY_RULES.items(): + if category == "other": + continue + for pattern in patterns: + if re.search(pattern, text, re.IGNORECASE): + return category + return "other" + + +# --------------------------------------------------------------------------- +# Gitea API +# --------------------------------------------------------------------------- + +def _load_token() -> str: + try: + return open(TOKEN_PATH).read().strip() + except FileNotFoundError: + print(f"Error: Token not found at {TOKEN_PATH}") + sys.exit(1) + + +def api_get(path: str, token: str) -> Any: + req = urllib.request.Request(f"{GITEA_BASE}{path}") + req.add_header("Authorization", f"token {token}") + resp = urllib.request.urlopen(req, timeout=30) + return json.loads(resp.read()) + + +def get_open_prs(repo: str, token: str) -> list[dict]: + """Fetch all open PRs for a repo.""" + prs = [] + page = 1 + while True: + try: + batch = api_get(f"/repos/{ORG}/{repo}/pulls?state=open&limit=50&page={page}", token) + if not batch: + break + prs.extend(batch) + if len(batch) < 50: + break + page += 1 + except Exception: + break + return prs + + +def get_issue_state(repo: str, issue_num: int, token: str) -> Optional[str]: + """Check if a referenced issue is still open.""" + try: + issue = api_get(f"/repos/{ORG}/{repo}/issues/{issue_num}", token) + return issue.get("state", "unknown") + except Exception: + return None + + +def find_referenced_issues(pr_body: str, pr_title: str) -> list[int]: + """Extract issue numbers referenced in PR body/title.""" + text = f"{pr_title} {pr_body}" + return [int(m) for m in re.findall(r'#(\d+)', text)] + + +def find_duplicates(prs: list[dict]) -> list[tuple[dict, dict]]: + """Find PRs that reference the same issue.""" + issue_to_prs: dict[int, list[dict]] = {} + for pr in prs: + refs = find_referenced_issues(pr.get("body", ""), pr.get("title", "")) + for issue_num in refs: + issue_to_prs.setdefault(issue_num, []).append(pr) + + duplicates = [] + for issue_num, pr_list in issue_to_prs.items(): + if len(pr_list) > 1: + # Pair up duplicates + for i in range(len(pr_list)): + for j in range(i + 1, len(pr_list)): + duplicates.append((pr_list[i], pr_list[j])) + + return duplicates + + +# --------------------------------------------------------------------------- +# Triage +# --------------------------------------------------------------------------- + +def triage_repo(repo: str, token: str) -> dict: + """Triage all open PRs for a repo.""" + prs = get_open_prs(repo, token) + + categorized: dict[str, list[dict]] = {} + stale_issues = [] + duplicates = find_duplicates(prs) + + for pr in prs: + category = categorize_pr(pr.get("title", ""), pr.get("body", "")) + categorized.setdefault(category, []).append(pr) + + # Check referenced issues + refs = find_referenced_issues(pr.get("body", ""), pr.get("title", "")) + for issue_num in refs: + state = get_issue_state(repo, issue_num, token) + if state == "closed": + stale_issues.append({"pr": pr["number"], "issue": issue_num, "repo": repo}) + + return { + "repo": repo, + "total_prs": len(prs), + "by_category": {k: len(v) for k, v in categorized.items()}, + "categorized": categorized, + "duplicates": [(a["number"], b["number"]) for a, b in duplicates], + "stale_issues": stale_issues, + } + + +def triage_all(repos: list[str], token: str) -> list[dict]: + """Triage all repos.""" + results = [] + for repo in repos: + print(f" Triaging {repo}...", file=sys.stderr) + try: + result = triage_repo(repo, token) + results.append(result) + except Exception as e: + print(f" Error triaging {repo}: {e}", file=sys.stderr) + results.append({"repo": repo, "error": str(e)}) + return results + + +# --------------------------------------------------------------------------- +# Report +# --------------------------------------------------------------------------- + +def generate_markdown_report(results: list[dict]) -> str: + """Generate a markdown triage report.""" + total_prs = sum(r.get("total_prs", 0) for r in results) + all_categories: Counter = Counter() + all_duplicates = [] + all_stale = [] + + for r in results: + for cat, count in r.get("by_category", {}).items(): + all_categories[cat] += count + all_duplicates.extend(r.get("duplicates", [])) + all_stale.extend(r.get("stale_issues", [])) + + lines = [ + "# PR Triage Report", + "", + f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}", + "", + "## Summary", + "", + f"| Metric | Count |", + f"|--------|-------|", + f"| Total open PRs | {total_prs} |", + f"| Repos scanned | {len(results)} |", + f"| Duplicates found | {len(all_duplicates)} |", + f"| Stale (issue closed) | {len(all_stale)} |", + "", + "## By Category", + "", + "| Category | Count |", + "|----------|-------|", + ] + + for cat, count in all_categories.most_common(): + lines.append(f"| {cat} | {count} |") + + if all_duplicates: + lines.extend(["", "## Duplicates (same issue referenced)", ""]) + for a, b in all_duplicates: + lines.append(f"- PR #{a} and PR #{b}") + + if all_stale: + lines.extend(["", "## Stale PRs (referenced issue is closed)", ""]) + for s in all_stale: + lines.append(f"- {s['repo']} PR #{s['pr']} → issue #{s['issue']} (closed)") + + # Per-repo detail + for r in results: + if r.get("error"): + lines.extend(["", f"## {r['repo']} — ERROR", "", f"```{r['error']}```"]) + continue + + lines.extend([f"", f"## {r['repo']} ({r.get('total_prs', 0)} open PRs)", ""]) + for cat, prs in r.get("categorized", {}).items(): + if not prs: + continue + lines.append(f" \ No newline at end of file