|
|
|
|
@@ -0,0 +1,271 @@
|
|
|
|
|
#!/usr/bin/env python3
|
|
|
|
|
"""
|
|
|
|
|
PR Triage Automation — Categorize, deduplicate, and report on open PRs.
|
|
|
|
|
|
|
|
|
|
Usage:
|
|
|
|
|
python scripts/pr_triage.py # Generate report
|
|
|
|
|
python scripts/pr_triage.py --json # JSON output
|
|
|
|
|
python scripts/pr_triage.py --auto-merge # Auto-merge safe PRs
|
|
|
|
|
python scripts/pr_triage.py --repo timmy-home # Single repo
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
import json
|
|
|
|
|
import os
|
|
|
|
|
import re
|
|
|
|
|
import sys
|
|
|
|
|
from collections import Counter
|
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
|
from pathlib import Path
|
|
|
|
|
from typing import Any, Optional
|
|
|
|
|
|
|
|
|
|
try:
|
|
|
|
|
import urllib.request
|
|
|
|
|
except ImportError:
|
|
|
|
|
print("Error: urllib not available")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Config
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
GITEA_BASE = os.environ.get("GITEA_API_BASE", "https://forge.alexanderwhitestone.com/api/v1")
|
|
|
|
|
TOKEN_PATH = os.environ.get("GITEA_TOKEN_PATH", str(Path.home() / ".config/gitea/token"))
|
|
|
|
|
ORG = "Timmy_Foundation"
|
|
|
|
|
|
|
|
|
|
DEFAULT_REPOS = [
|
|
|
|
|
"timmy-home",
|
|
|
|
|
"hermes-agent",
|
|
|
|
|
"timmy-config",
|
|
|
|
|
"the-nexus",
|
|
|
|
|
"the-door",
|
|
|
|
|
"burn-fleet",
|
|
|
|
|
"second-son-of-timmy",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Categories
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
CATEGORY_RULES = {
|
|
|
|
|
"training-data": [
|
|
|
|
|
r"training[- ]?data", r"scene[- ]?description", r"dpo", r"training",
|
|
|
|
|
r"batch[- ]?\d+", r"training[- ]?pipeline", r"jsonl",
|
|
|
|
|
],
|
|
|
|
|
"bug-fix": [
|
|
|
|
|
r"^fix[\(:]", r"\[BUG\]", r"\[FIX\]", r"bug fix", r"fixes #\d+",
|
|
|
|
|
r"closes #\d+", r"broken", r"crash", r"regression",
|
|
|
|
|
],
|
|
|
|
|
"feature": [
|
|
|
|
|
r"^feat[\(:]", r"\[FEAT\]", r"\[FEATURE\]", r"new feature",
|
|
|
|
|
r"add .+ support", r"implement",
|
|
|
|
|
],
|
|
|
|
|
"docs": [
|
|
|
|
|
r"^docs[\(:]", r"documentation", r"readme", r"genome",
|
|
|
|
|
],
|
|
|
|
|
"security": [
|
|
|
|
|
r"\[SECURITY\]", r"\[VITALIK\]", r"shield", r"injection",
|
|
|
|
|
r"vulnerability", r"hardening",
|
|
|
|
|
],
|
|
|
|
|
"infra": [
|
|
|
|
|
r"\[INFRA\]", r"deploy", r"ansible", r"docker", r"ci[/ ]cd",
|
|
|
|
|
r"cron", r"watchdog", r"systemd",
|
|
|
|
|
],
|
|
|
|
|
"research": [
|
|
|
|
|
r"research", r"benchmark", r"evaluation", r"analysis",
|
|
|
|
|
r"\[BIG-BRAIN\]", r"investigate",
|
|
|
|
|
],
|
|
|
|
|
"other": [], # fallback
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def categorize_pr(title: str, body: str) -> str:
|
|
|
|
|
"""Categorize a PR by its title and body."""
|
|
|
|
|
text = f"{title} {body}".lower()
|
|
|
|
|
for category, patterns in CATEGORY_RULES.items():
|
|
|
|
|
if category == "other":
|
|
|
|
|
continue
|
|
|
|
|
for pattern in patterns:
|
|
|
|
|
if re.search(pattern, text, re.IGNORECASE):
|
|
|
|
|
return category
|
|
|
|
|
return "other"
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Gitea API
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def _load_token() -> str:
|
|
|
|
|
try:
|
|
|
|
|
return open(TOKEN_PATH).read().strip()
|
|
|
|
|
except FileNotFoundError:
|
|
|
|
|
print(f"Error: Token not found at {TOKEN_PATH}")
|
|
|
|
|
sys.exit(1)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def api_get(path: str, token: str) -> Any:
|
|
|
|
|
req = urllib.request.Request(f"{GITEA_BASE}{path}")
|
|
|
|
|
req.add_header("Authorization", f"token {token}")
|
|
|
|
|
resp = urllib.request.urlopen(req, timeout=30)
|
|
|
|
|
return json.loads(resp.read())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_open_prs(repo: str, token: str) -> list[dict]:
|
|
|
|
|
"""Fetch all open PRs for a repo."""
|
|
|
|
|
prs = []
|
|
|
|
|
page = 1
|
|
|
|
|
while True:
|
|
|
|
|
try:
|
|
|
|
|
batch = api_get(f"/repos/{ORG}/{repo}/pulls?state=open&limit=50&page={page}", token)
|
|
|
|
|
if not batch:
|
|
|
|
|
break
|
|
|
|
|
prs.extend(batch)
|
|
|
|
|
if len(batch) < 50:
|
|
|
|
|
break
|
|
|
|
|
page += 1
|
|
|
|
|
except Exception:
|
|
|
|
|
break
|
|
|
|
|
return prs
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def get_issue_state(repo: str, issue_num: int, token: str) -> Optional[str]:
|
|
|
|
|
"""Check if a referenced issue is still open."""
|
|
|
|
|
try:
|
|
|
|
|
issue = api_get(f"/repos/{ORG}/{repo}/issues/{issue_num}", token)
|
|
|
|
|
return issue.get("state", "unknown")
|
|
|
|
|
except Exception:
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_referenced_issues(pr_body: str, pr_title: str) -> list[int]:
|
|
|
|
|
"""Extract issue numbers referenced in PR body/title."""
|
|
|
|
|
text = f"{pr_title} {pr_body}"
|
|
|
|
|
return [int(m) for m in re.findall(r'#(\d+)', text)]
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def find_duplicates(prs: list[dict]) -> list[tuple[dict, dict]]:
|
|
|
|
|
"""Find PRs that reference the same issue."""
|
|
|
|
|
issue_to_prs: dict[int, list[dict]] = {}
|
|
|
|
|
for pr in prs:
|
|
|
|
|
refs = find_referenced_issues(pr.get("body", ""), pr.get("title", ""))
|
|
|
|
|
for issue_num in refs:
|
|
|
|
|
issue_to_prs.setdefault(issue_num, []).append(pr)
|
|
|
|
|
|
|
|
|
|
duplicates = []
|
|
|
|
|
for issue_num, pr_list in issue_to_prs.items():
|
|
|
|
|
if len(pr_list) > 1:
|
|
|
|
|
# Pair up duplicates
|
|
|
|
|
for i in range(len(pr_list)):
|
|
|
|
|
for j in range(i + 1, len(pr_list)):
|
|
|
|
|
duplicates.append((pr_list[i], pr_list[j]))
|
|
|
|
|
|
|
|
|
|
return duplicates
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Triage
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def triage_repo(repo: str, token: str) -> dict:
|
|
|
|
|
"""Triage all open PRs for a repo."""
|
|
|
|
|
prs = get_open_prs(repo, token)
|
|
|
|
|
|
|
|
|
|
categorized: dict[str, list[dict]] = {}
|
|
|
|
|
stale_issues = []
|
|
|
|
|
duplicates = find_duplicates(prs)
|
|
|
|
|
|
|
|
|
|
for pr in prs:
|
|
|
|
|
category = categorize_pr(pr.get("title", ""), pr.get("body", ""))
|
|
|
|
|
categorized.setdefault(category, []).append(pr)
|
|
|
|
|
|
|
|
|
|
# Check referenced issues
|
|
|
|
|
refs = find_referenced_issues(pr.get("body", ""), pr.get("title", ""))
|
|
|
|
|
for issue_num in refs:
|
|
|
|
|
state = get_issue_state(repo, issue_num, token)
|
|
|
|
|
if state == "closed":
|
|
|
|
|
stale_issues.append({"pr": pr["number"], "issue": issue_num, "repo": repo})
|
|
|
|
|
|
|
|
|
|
return {
|
|
|
|
|
"repo": repo,
|
|
|
|
|
"total_prs": len(prs),
|
|
|
|
|
"by_category": {k: len(v) for k, v in categorized.items()},
|
|
|
|
|
"categorized": categorized,
|
|
|
|
|
"duplicates": [(a["number"], b["number"]) for a, b in duplicates],
|
|
|
|
|
"stale_issues": stale_issues,
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def triage_all(repos: list[str], token: str) -> list[dict]:
|
|
|
|
|
"""Triage all repos."""
|
|
|
|
|
results = []
|
|
|
|
|
for repo in repos:
|
|
|
|
|
print(f" Triaging {repo}...", file=sys.stderr)
|
|
|
|
|
try:
|
|
|
|
|
result = triage_repo(repo, token)
|
|
|
|
|
results.append(result)
|
|
|
|
|
except Exception as e:
|
|
|
|
|
print(f" Error triaging {repo}: {e}", file=sys.stderr)
|
|
|
|
|
results.append({"repo": repo, "error": str(e)})
|
|
|
|
|
return results
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
# Report
|
|
|
|
|
# ---------------------------------------------------------------------------
|
|
|
|
|
|
|
|
|
|
def generate_markdown_report(results: list[dict]) -> str:
|
|
|
|
|
"""Generate a markdown triage report."""
|
|
|
|
|
total_prs = sum(r.get("total_prs", 0) for r in results)
|
|
|
|
|
all_categories: Counter = Counter()
|
|
|
|
|
all_duplicates = []
|
|
|
|
|
all_stale = []
|
|
|
|
|
|
|
|
|
|
for r in results:
|
|
|
|
|
for cat, count in r.get("by_category", {}).items():
|
|
|
|
|
all_categories[cat] += count
|
|
|
|
|
all_duplicates.extend(r.get("duplicates", []))
|
|
|
|
|
all_stale.extend(r.get("stale_issues", []))
|
|
|
|
|
|
|
|
|
|
lines = [
|
|
|
|
|
"# PR Triage Report",
|
|
|
|
|
"",
|
|
|
|
|
f"Generated: {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M UTC')}",
|
|
|
|
|
"",
|
|
|
|
|
"## Summary",
|
|
|
|
|
"",
|
|
|
|
|
f"| Metric | Count |",
|
|
|
|
|
f"|--------|-------|",
|
|
|
|
|
f"| Total open PRs | {total_prs} |",
|
|
|
|
|
f"| Repos scanned | {len(results)} |",
|
|
|
|
|
f"| Duplicates found | {len(all_duplicates)} |",
|
|
|
|
|
f"| Stale (issue closed) | {len(all_stale)} |",
|
|
|
|
|
"",
|
|
|
|
|
"## By Category",
|
|
|
|
|
"",
|
|
|
|
|
"| Category | Count |",
|
|
|
|
|
"|----------|-------|",
|
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
for cat, count in all_categories.most_common():
|
|
|
|
|
lines.append(f"| {cat} | {count} |")
|
|
|
|
|
|
|
|
|
|
if all_duplicates:
|
|
|
|
|
lines.extend(["", "## Duplicates (same issue referenced)", ""])
|
|
|
|
|
for a, b in all_duplicates:
|
|
|
|
|
lines.append(f"- PR #{a} and PR #{b}")
|
|
|
|
|
|
|
|
|
|
if all_stale:
|
|
|
|
|
lines.extend(["", "## Stale PRs (referenced issue is closed)", ""])
|
|
|
|
|
for s in all_stale:
|
|
|
|
|
lines.append(f"- {s['repo']} PR #{s['pr']} → issue #{s['issue']} (closed)")
|
|
|
|
|
|
|
|
|
|
# Per-repo detail
|
|
|
|
|
for r in results:
|
|
|
|
|
if r.get("error"):
|
|
|
|
|
lines.extend(["", f"## {r['repo']} — ERROR", "", f"```{r['error']}```"])
|
|
|
|
|
continue
|
|
|
|
|
|
|
|
|
|
lines.extend([f"", f"## {r['repo']} ({r.get('total_prs', 0)} open PRs)", ""])
|
|
|
|
|
for cat, prs in r.get("categorized", {}).items():
|
|
|
|
|
if not prs:
|
|
|
|
|
continue
|
|
|
|
|
lines.append(f"
|