Files
the-nexus/scripts/timmy_config_pr_backlog_audit.py
2026-04-15 21:45:22 -04:00

258 lines
10 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import re
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
from urllib.error import HTTPError
from urllib.request import Request, urlopen
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
ORG = "Timmy_Foundation"
SOURCE_REPO = "the-nexus"
TARGET_REPO = "timmy-config"
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
DEFAULT_OUTPUT = "reports/2026-04-16-timmy-config-pr-backlog-audit.md"
def api_get(path: str, token: str) -> Any:
req = Request(API_BASE + path, headers={"Authorization": f"token {token}"})
with urlopen(req, timeout=30) as resp:
return json.loads(resp.read().decode())
def extract_issue_refs(title: str = "", body: str = "", head: str = "") -> list[int]:
text = " ".join(filter(None, [title, body, head]))
refs: list[int] = []
seen: set[int] = set()
for match in re.finditer(r"#(\d+)", text):
value = int(match.group(1))
if value not in seen:
seen.add(value)
refs.append(value)
if not refs and head:
for match in re.finditer(r"(?:^|[/-])(\d+)(?:$|[/-])", head):
value = int(match.group(1))
if value not in seen:
seen.add(value)
refs.append(value)
return refs
def summarize_backlog(backlog: list[dict[str, Any]], now_iso: str | None = None, stale_days: int = 7) -> dict[str, Any]:
now = _parse_iso(now_iso) if now_iso else datetime.now(timezone.utc)
duplicate_groups: dict[tuple[int, ...], list[dict[str, Any]]] = {}
missing_reviewer = []
stale = []
mergeable = []
for pr in backlog:
refs_list = pr.get("issue_refs") or extract_issue_refs(
pr.get("title") or "",
pr.get("body") or "",
pr.get("head") or "",
)
if not pr.get("issue_refs"):
pr["issue_refs"] = refs_list
refs = tuple(refs_list)
if refs:
duplicate_groups.setdefault(refs, []).append(pr)
if pr.get("review_count", 0) + pr.get("requested_reviewers", 0) == 0:
missing_reviewer.append(pr)
updated_at = _parse_iso(pr["updated_at"])
if now - updated_at > timedelta(days=stale_days):
stale.append(pr)
if pr.get("mergeable"):
mergeable.append(pr)
dupes = [
{"issue_refs": list(refs), "prs": prs}
for refs, prs in duplicate_groups.items()
if len(prs) > 1
]
dupes.sort(key=lambda item: (item["issue_refs"][0] if item["issue_refs"] else 10**9))
return {
"total_open_prs": len(backlog),
"mergeable_count": len(mergeable),
"missing_reviewer_count": len(missing_reviewer),
"stale_count": len(stale),
"duplicate_issue_groups": dupes,
"mergeable_prs": mergeable,
"missing_reviewer_prs": missing_reviewer,
"stale_prs": stale,
}
def render_report(*, source_issue: int, source_title: str, summary: dict[str, Any], backlog: list[dict[str, Any]], generated_at: str) -> str:
lines = [
f"# Timmy-config PR Backlog Audit — the-nexus #{source_issue}",
"",
f"Generated: {generated_at}",
f"Source issue: `{source_title}`",
"",
"## Source Snapshot",
"",
"Issue #1471 claims timmy-config had 9 open PRs and the highest PR backlog in the org during the original triage snapshot.",
"This audit re-queries the live PR backlog and classifies it against current forge state instead of trusting that stale count.",
"",
"## Live Summary",
"",
f"- Open PRs on `{ORG}/{TARGET_REPO}`: {summary['total_open_prs']}",
f"- Mergeable right now: {summary['mergeable_count']}",
f"- PRs with no reviewers or requested reviewers: {summary['missing_reviewer_count']}",
f"- Stale PRs older than 7 days: {summary['stale_count']}",
f"- Duplicate issue groups detected: {len(summary['duplicate_issue_groups'])}",
"",
"## Issue Body Drift",
"",
"The body of #1471 is materially stale: it references a 9-PR backlog, while the live audit found the current open-PR count above that historical snapshot.",
"This means the issue should be treated as a process/report problem, not as a direct live-merge instruction.",
"",
"## Duplicate Issue Groups",
"",
]
if summary["duplicate_issue_groups"]:
lines.extend(["| Issue refs | PRs |", "|---|---|"])
for group in summary["duplicate_issue_groups"]:
refs = ", ".join(f"#{n}" for n in group["issue_refs"]) or "(none)"
prs = "; ".join(f"#{pr['number']} ({pr['head']})" for pr in group["prs"])
lines.append(f"| {refs} | {prs} |")
else:
lines.append("No duplicate issue groups detected in the live backlog.")
lines.extend([
"",
"## Reviewer Coverage",
"",
])
if summary["missing_reviewer_prs"]:
lines.extend(["| PR | Title | Updated |", "|---|---|---|"])
for pr in summary["missing_reviewer_prs"][:20]:
lines.append(f"| #{pr['number']} | {pr['title']} | {pr['updated_at'][:10]} |")
if len(summary["missing_reviewer_prs"]) > 20:
lines.append(f"| ... | ... | +{len(summary['missing_reviewer_prs']) - 20} more |")
else:
lines.append("All open PRs currently show reviewer coverage signals.")
lines.extend([
"",
"## Mergeable Snapshot",
"",
])
if summary["mergeable_prs"]:
lines.extend(["| PR | Title | Head branch |", "|---|---|---|"])
for pr in summary["mergeable_prs"][:20]:
lines.append(f"| #{pr['number']} | {pr['title']} | `{pr['head']}` |")
if len(summary["mergeable_prs"]) > 20:
lines.append(f"| ... | ... | +{len(summary['mergeable_prs']) - 20} more mergeable PRs |")
else:
lines.append("No mergeable PRs reported in the live backlog snapshot.")
lines.extend([
"",
"## Stale PRs",
"",
])
if summary["stale_prs"]:
lines.extend(["| PR | Title | Updated |", "|---|---|---|"])
for pr in summary["stale_prs"]:
lines.append(f"| #{pr['number']} | {pr['title']} | {pr['updated_at'][:10]} |")
else:
lines.append("No stale PRs older than 7 days were detected in the live snapshot.")
lines.extend([
"",
"## Recommended Next Actions",
"",
"1. Use the duplicate-issue groups to collapse obviously redundant PRs before attempting any merge sweep.",
"2. Assign reviewers (or request them) on the PRs with zero reviewer coverage so the backlog becomes reviewable instead of merely mergeable.",
"3. Prioritize mergeable PRs with unique issue refs and recent updates for the next burndown pass.",
"4. Treat this report as the live reference for #1471; the original issue body is now a stale ops snapshot.",
"",
"## Raw Backlog Snapshot",
"",
"| PR | Mergeable | Review signals | Issue refs |",
"|---|---|---|---|",
])
for pr in backlog[:40]:
refs = ", ".join(f"#{n}" for n in pr.get("issue_refs", [])) or "(none)"
review_signals = pr.get("review_count", 0) + pr.get("requested_reviewers", 0)
lines.append(f"| #{pr['number']} | {pr['mergeable']} | {review_signals} | {refs} |")
if len(backlog) > 40:
lines.append(f"| ... | ... | ... | +{len(backlog) - 40} more PRs |")
return "\n".join(lines) + "\n"
def collect_backlog(repo: str, token: str) -> list[dict[str, Any]]:
prs: list[dict[str, Any]] = []
for page in range(1, 6):
batch = api_get(f"/repos/{ORG}/{repo}/pulls?state=open&limit=100&page={page}", token)
if not batch:
break
for pr in batch:
number = pr["number"]
reviews = _safe_api_get(f"/repos/{ORG}/{repo}/pulls/{number}/reviews", token) or []
requested = _safe_api_get(f"/repos/{ORG}/{repo}/pulls/{number}/requested_reviewers", token) or {}
prs.append({
"number": number,
"title": pr.get("title") or "",
"body": pr.get("body") or "",
"head": (pr.get("head") or {}).get("ref") or "",
"mergeable": bool(pr.get("mergeable")),
"updated_at": pr.get("updated_at") or pr.get("created_at") or "1970-01-01T00:00:00Z",
"review_count": len([r for r in reviews if r.get("state")]),
"requested_reviewers": len(requested.get("users", []) or []),
"issue_refs": extract_issue_refs(pr.get("title") or "", pr.get("body") or "", (pr.get("head") or {}).get("ref") or ""),
})
if len(batch) < 100:
break
return prs
def _safe_api_get(path: str, token: str):
try:
return api_get(path, token)
except HTTPError:
return None
def _parse_iso(value: str) -> datetime:
return datetime.fromisoformat(value.replace("Z", "+00:00"))
def main() -> int:
parser = argparse.ArgumentParser(description="Audit the live timmy-config PR backlog for the-nexus issue #1471.")
parser.add_argument("--issue", type=int, default=1471)
parser.add_argument("--source-repo", default=SOURCE_REPO)
parser.add_argument("--target-repo", default=TARGET_REPO)
parser.add_argument("--output", default=DEFAULT_OUTPUT)
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
args = parser.parse_args()
token = Path(args.token_file).read_text(encoding="utf-8").strip()
issue = api_get(f"/repos/{ORG}/{args.source_repo}/issues/{args.issue}", token)
backlog = collect_backlog(args.target_repo, token)
summary = summarize_backlog(backlog)
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
report = render_report(
source_issue=args.issue,
source_title=issue.get("title") or "",
summary=summary,
backlog=backlog,
generated_at=generated_at,
)
out = Path(args.output)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(report, encoding="utf-8")
print(out)
return 0
if __name__ == "__main__":
raise SystemExit(main())