296 lines
11 KiB
Python
296 lines
11 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import re
|
|
from datetime import datetime, timezone
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from urllib.request import Request, urlopen
|
|
|
|
API_BASE = "https://forge.alexanderwhitestone.com/api/v1"
|
|
ORG = "Timmy_Foundation"
|
|
DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token")
|
|
DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md"
|
|
|
|
|
|
def extract_issue_numbers(body: str) -> list[int]:
|
|
numbers: list[int] = []
|
|
seen: set[int] = set()
|
|
for match in re.finditer(r"#(\d+)", body or ""):
|
|
value = int(match.group(1))
|
|
if value in seen:
|
|
continue
|
|
seen.add(value)
|
|
numbers.append(value)
|
|
return numbers
|
|
|
|
|
|
def api_get(repo: str, path: str, token: str) -> Any:
|
|
req = Request(
|
|
f"{API_BASE}/repos/{ORG}/{repo}{path}",
|
|
headers={"Authorization": f"token {token}"},
|
|
)
|
|
with urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode())
|
|
|
|
|
|
def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]:
|
|
prs: list[dict[str, Any]] = []
|
|
page = 1
|
|
while True:
|
|
batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token)
|
|
if not batch:
|
|
break
|
|
prs.extend(batch)
|
|
page += 1
|
|
return prs
|
|
|
|
|
|
def fetch_live_open_issue_count(repo: str, token: str) -> int:
|
|
total = 0
|
|
page = 1
|
|
while True:
|
|
batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token)
|
|
if not batch:
|
|
break
|
|
total += sum(1 for item in batch if not item.get("pull_request"))
|
|
page += 1
|
|
return total
|
|
|
|
|
|
def parse_claimed_summary(body: str) -> tuple[int | None, int | None]:
|
|
issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE)
|
|
pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE)
|
|
claimed_open_issues = int(issue_match.group(1)) if issue_match else None
|
|
claimed_open_prs = int(pr_match.group(1)) if pr_match else None
|
|
return claimed_open_issues, claimed_open_prs
|
|
|
|
|
|
def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str:
|
|
matches: list[str] = []
|
|
seen: set[int] = set()
|
|
for pr in open_prs:
|
|
pr_num = pr["number"]
|
|
if pr_num in seen:
|
|
continue
|
|
text = "\n".join(
|
|
[
|
|
pr.get("title") or "",
|
|
pr.get("body") or "",
|
|
(pr.get("head") or {}).get("ref") or "",
|
|
]
|
|
)
|
|
if f"#{issue_num}" not in text:
|
|
continue
|
|
seen.add(pr_num)
|
|
matches.append(f"open PR #{pr_num}")
|
|
return ", ".join(matches) if matches else "none"
|
|
|
|
|
|
def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]:
|
|
issue_num = ref_issue["number"]
|
|
state = ref_issue.get("state") or "unknown"
|
|
coverage = summarize_open_pr_coverage(issue_num, open_prs)
|
|
if state == "closed":
|
|
classification = "closed_issue"
|
|
elif coverage != "none":
|
|
classification = "open_with_current_pr"
|
|
else:
|
|
classification = "open_no_current_pr"
|
|
return {
|
|
"number": issue_num,
|
|
"state": state,
|
|
"classification": classification,
|
|
"title": ref_issue.get("title") or "",
|
|
"current_pr_coverage": coverage,
|
|
"url": ref_issue.get("html_url") or ref_issue.get("url") or "",
|
|
}
|
|
|
|
|
|
def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]:
|
|
pr = api_get(repo, f"/pulls/{pr_num}", token)
|
|
state = pr.get("state") or "unknown"
|
|
merged = bool(pr.get("merged"))
|
|
if merged:
|
|
classification = "merged_pr"
|
|
elif state == "open":
|
|
classification = "open_pr"
|
|
else:
|
|
classification = "closed_unmerged_pr"
|
|
return {
|
|
"number": pr_num,
|
|
"state": state,
|
|
"merged": merged,
|
|
"classification": classification,
|
|
"title": pr.get("title") or "",
|
|
"head": (pr.get("head") or {}).get("ref") or "",
|
|
"url": pr.get("html_url") or pr.get("url") or "",
|
|
}
|
|
|
|
|
|
def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str:
|
|
headers = [title for title, _ in columns]
|
|
keys = [key for _, key in columns]
|
|
if not rows:
|
|
return "| None |\n|---|\n| None |"
|
|
lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"]
|
|
for row in rows:
|
|
values: list[str] = []
|
|
for key in keys:
|
|
value = row.get(key, "")
|
|
if key == "number" and value != "":
|
|
value = f"#{value}"
|
|
values.append(str(value).replace("\n", " "))
|
|
lines.append("| " + " | ".join(values) + " |")
|
|
return "\n".join(lines)
|
|
|
|
|
|
def render_report(
|
|
*,
|
|
source_issue: int,
|
|
source_title: str,
|
|
generated_at: str,
|
|
claimed_open_issues: int | None,
|
|
claimed_open_prs: int | None,
|
|
live_open_issues: int,
|
|
live_open_prs: int,
|
|
issue_rows: list[dict[str, Any]],
|
|
pr_rows: list[dict[str, Any]],
|
|
) -> str:
|
|
open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"]
|
|
open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"]
|
|
closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"]
|
|
merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"]
|
|
open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"]
|
|
closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"]
|
|
|
|
drift_lines = [
|
|
f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.",
|
|
f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.",
|
|
f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.",
|
|
f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.",
|
|
]
|
|
|
|
return "\n".join(
|
|
[
|
|
f"# The Door Fleet Work Orders Audit — issue #{source_issue}",
|
|
"",
|
|
f"Generated: {generated_at}",
|
|
f"Source issue: `{source_title}`",
|
|
"",
|
|
"## Source Snapshot",
|
|
"",
|
|
"Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.",
|
|
"",
|
|
"## Live Summary",
|
|
"",
|
|
f"- Referenced issues audited: {len(issue_rows)}",
|
|
f"- Referenced PRs audited: {len(pr_rows)}",
|
|
f"- Live repo open issues: {live_open_issues}",
|
|
f"- Live repo open PRs: {live_open_prs}",
|
|
f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}",
|
|
f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}",
|
|
f"- Closed referenced issues: {len(closed_issues)}",
|
|
f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}",
|
|
"",
|
|
"## Issue Body Drift",
|
|
"",
|
|
*drift_lines,
|
|
"",
|
|
"## Referenced Issue Snapshot",
|
|
"",
|
|
table(
|
|
issue_rows,
|
|
[
|
|
("Issue", "number"),
|
|
("State", "state"),
|
|
("Classification", "classification"),
|
|
("Current PR Coverage", "current_pr_coverage"),
|
|
("Title", "title"),
|
|
],
|
|
),
|
|
"",
|
|
"## Referenced PR Snapshot",
|
|
"",
|
|
table(
|
|
pr_rows,
|
|
[
|
|
("PR", "number"),
|
|
("State", "state"),
|
|
("Merged", "merged"),
|
|
("Classification", "classification"),
|
|
("Head", "head"),
|
|
("Title", "title"),
|
|
],
|
|
),
|
|
"",
|
|
"## Recommended Next Actions",
|
|
"",
|
|
"1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.",
|
|
"2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.",
|
|
"3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.",
|
|
"4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.",
|
|
"5. This audit preserves operator memory; it does not claim all referenced work orders are complete.",
|
|
]
|
|
) + "\n"
|
|
|
|
|
|
def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]:
|
|
source_issue = api_get(repo, f"/issues/{issue_number}", token)
|
|
body = source_issue.get("body") or ""
|
|
refs = extract_issue_numbers(body)
|
|
open_prs = fetch_open_prs(repo, token)
|
|
claimed_open_issues, claimed_open_prs = parse_claimed_summary(body)
|
|
issue_rows: list[dict[str, Any]] = []
|
|
pr_rows: list[dict[str, Any]] = []
|
|
for ref in refs:
|
|
issue_like = api_get(repo, f"/issues/{ref}", token)
|
|
if issue_like.get("pull_request"):
|
|
pr_rows.append(classify_pr_reference(repo, ref, token))
|
|
else:
|
|
issue_rows.append(classify_issue_reference(issue_like, open_prs))
|
|
metadata = {
|
|
"source_title": source_issue.get("title") or "",
|
|
"claimed_open_issues": claimed_open_issues,
|
|
"claimed_open_prs": claimed_open_prs,
|
|
"live_open_issues": fetch_live_open_issue_count(repo, token),
|
|
"live_open_prs": len(open_prs),
|
|
}
|
|
return metadata, issue_rows, pr_rows
|
|
|
|
|
|
def main() -> int:
|
|
parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.")
|
|
parser.add_argument("--repo", default="the-door")
|
|
parser.add_argument("--issue", type=int, default=75)
|
|
parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH)
|
|
parser.add_argument("--output", default=DEFAULT_OUTPUT)
|
|
args = parser.parse_args()
|
|
|
|
token = Path(args.token_file).read_text(encoding="utf-8").strip()
|
|
generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")
|
|
metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token)
|
|
report = render_report(
|
|
source_issue=args.issue,
|
|
source_title=metadata["source_title"],
|
|
generated_at=generated_at,
|
|
claimed_open_issues=metadata["claimed_open_issues"],
|
|
claimed_open_prs=metadata["claimed_open_prs"],
|
|
live_open_issues=metadata["live_open_issues"],
|
|
live_open_prs=metadata["live_open_prs"],
|
|
issue_rows=issue_rows,
|
|
pr_rows=pr_rows,
|
|
)
|
|
output_path = Path(args.output)
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
output_path.write_text(report, encoding="utf-8")
|
|
print(output_path)
|
|
return 0
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|