#!/usr/bin/env python3 """ PR Triage Automation — Categorize, deduplicate, and report on open PRs. Usage: python scripts/pr_triage.py # Generate report python scripts/pr_triage.py --json # JSON output python scripts/pr_triage.py --auto-merge # Auto-merge safe PRs python scripts/pr_triage.py --repo timmy-home # Single repo """ import json import os import re import sys from collections import Counter from datetime import datetime, timezone from pathlib import Path from typing import Any, Optional try: import urllib.request except ImportError: print("Error: urllib not available") sys.exit(1) # --------------------------------------------------------------------------- # Config # --------------------------------------------------------------------------- GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1") TOKEN_PATH = os.environ.get("GITEA_TOKEN_PATH", str(Path.home() / ".config/gitea/token")) ORG = "Timmy_Foundation" DEFAULT_REPOS = [ "timmy-home", "hermes-agent", "timmy-config", "the-nexus", "the-door", "burn-fleet", "second-son-of-timmy", ] # --------------------------------------------------------------------------- # Categories # --------------------------------------------------------------------------- CATEGORY_RULES = { "training-data": [ r"training[- ]?data", r"scene[- ]?description", r"dpo", r"training", r"batch[- ]?\d+", r"training[- ]?pipeline", r"jsonl", ], "bug-fix": [ r"^fix[\(:]", r"\[BUG\]", r"\[FIX\]", r"bug fix", r"fixes #\d+", r"closes #\d+", r"broken", r"crash", r"regression", ], "feature": [ r"^feat[\(:]", r"\[FEAT\]", r"\[FEATURE\]", r"new feature", r"add .+ support", r"implement", ], "docs": [ r"^docs[\(:]", r"documentation", r"readme", r"genome", ], "security": [ r"\[SECURITY\]", r"\[VITALIK\]", r"shield", r"injection", r"vulnerability", r"hardening", ], "infra": [ r"\[INFRA\]", r"deploy", r"ansible", r"docker", r"ci[/ ]cd", r"cron", r"watchdog", r"systemd", ], "research": [ r"research", r"benchmark", r"evaluation", r"analysis", r"\[BIG-BRAIN\]", r"investigate", ], "other": [], # fallback } def categorize_pr(title: str, body: str) -> str: """Categorize a PR by its title and body.""" text = f"{title} {body}".lower() for category, patterns in CATEGORY_RULES.items(): if category == "other": continue for pattern in patterns: if re.search(pattern, text, re.IGNORECASE): return category return "other" # --------------------------------------------------------------------------- # Gitea API # --------------------------------------------------------------------------- def _load_token() -> str: try: return open(TOKEN_PATH).read().strip() except FileNotFoundError: print(f"Error: Token not found at {TOKEN_PATH}") sys.exit(1) def api_get(path: str, token: str) -> Any: req = urllib.request.Request(f"{GITEA_BASE}{path}") req.add_header("Authorization", f"token {token}") resp = urllib.request.urlopen(req, timeout=30) return json.loads(resp.read()) def get_open_prs(repo: str, token: str) -> list[dict]: """Fetch all open PRs for a repo.""" prs = [] page = 1 while True: try: batch = api_get(f"/repos/{ORG}/{repo}/pulls?state=open&limit=50&page={page}", token) if not batch: break prs.extend(batch) if len(batch) < 50: break page += 1 except Exception: break return prs def get_issue_state(repo: str, issue_num: int, token: str) -> Optional[str]: """Check if a referenced issue is still open.""" try: issue = api_get(f"/repos/{ORG}/{repo}/issues/{issue_num}", token) return issue.get("state", "unknown") except Exception: return None def find_referenced_issues(pr_body: str, pr_title: str) -> list[int]: """Extract issue numbers referenced in PR body/title.""" text = f"{pr_title} {pr_body}" return [int(m) for m in re.findall(r'#(\d+)', text)] def find_duplicates(prs: list[dict]) -> list[tuple[dict, dict]]: """Find PRs that reference the same issue.""" issue_to_prs: dict[int, list[dict]] = {} for pr in prs: refs = find_referenced_issues(pr.get("body", ""), pr.get("title", "")) for issue_num in refs: issue_to_prs.setdefault(issue_num, []).append(pr) duplicates = [] for issue_num, pr_list in issue_to_prs.items(): if len(pr_list) > 1: # Pair up duplicates for i in range(len(pr_list)): for j in range(i + 1, len(pr_list)): duplicates.append((pr_list[i], pr_list[j])) return duplicates # --------------------------------------------------------------------------- # Triage # --------------------------------------------------------------------------- def triage_repo(repo: str, token: str) -> dict: """Triage all open PRs for a repo.""" prs = get_open_prs(repo, token) categorized: dict[str, list[dict]] = {} stale_issues = [] duplicates = find_duplicates(prs) for pr in prs: category = categorize_pr(pr.get("title", ""), pr.get("body", "")) categorized.setdefault(category, []).append(pr) # Check referenced issues refs = find_referenced_issues(pr.get("body", ""), pr.get("title", "")) for issue_num in refs: state = get_issue_state(repo, issue_num, token) if state == "closed": stale_issues.append({"pr": pr["number"], "issue": issue_num, "repo": repo}) return { "repo": repo, "total_prs": len(prs), "by_category": {k: len(v) for k, v in categorized.items()}, "categorized": categorized, "duplicates": [(a["number"], b["number"]) for a, b in duplicates], "stale_issues": stale_issues, } def triage_all(repos: list[str], token: str) -> list[dict]: """Triage all repos.""" results = [] for repo in repos: print(f" Triaging {repo}...", file=sys.stderr) try: result = triage_repo(repo, token) results.append(result) except Exception as e: print(f" Error triaging {repo}: {e}", file=sys.stderr) results.append({"repo": repo, "error": str(e)}) return results # --------------------------------------------------------------------------- # Report # --------------------------------------------------------------------------- def generate_markdown_report(results: list[dict]) -> str: """Generate a markdown triage report.""" total_prs = sum(r.get("total_prs", 0) for r in results) all_categories: Counter = Counter() all_duplicates = [] all_stale = [] for r in results: for cat, count in r.get("by_category", {}).items(): all_categories[cat] += count all_duplicates.extend(r.get("duplicates", [])) all_stale.extend(r.get("stale_issues", [])) lines = [ "# PR Triage Report", "", f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}", "", "## Summary", "", f"| Metric | Count |", f"|--------|-------|", f"| Total open PRs | {total_prs} |", f"| Repos scanned | {len(results)} |", f"| Duplicates found | {len(all_duplicates)} |", f"| Stale (issue closed) | {len(all_stale)} |", "", "## By Category", "", "| Category | Count |", "|----------|-------|", ] for cat, count in all_categories.most_common(): lines.append(f"| {cat} | {count} |") if all_duplicates: lines.extend(["", "## Duplicates (same issue referenced)", ""]) for a, b in all_duplicates: lines.append(f"- PR #{a} and PR #{b}") if all_stale: lines.extend(["", "## Stale PRs (referenced issue is closed)", ""]) for s in all_stale: lines.append(f"- {s['repo']} PR #{s['pr']} → issue #{s['issue']} (closed)") # Per-repo detail for r in results: if r.get("error"): lines.extend(["", f"## {r['repo']} — ERROR", "", f"```{r['error']}```"]) continue lines.extend([f"", f"## {r['repo']} ({r.get('total_prs', 0)} open PRs)", ""]) for cat, prs in r.get("categorized", {}).items(): if not prs: continue lines.append(f"