#!/usr/bin/env python3 from __future__ import annotations import argparse import json import os import re from datetime import datetime, timezone from pathlib import Path from typing import Any from urllib.request import Request, urlopen API_BASE = "https://forge.alexanderwhitestone.com/api/v1" ORG = "Timmy_Foundation" DEFAULT_TOKEN_PATH = os.path.expanduser("~/.config/gitea/token") DEFAULT_OUTPUT = "reports/2026-04-17-the-door-fleet-work-orders-audit.md" def extract_issue_numbers(body: str) -> list[int]: numbers: list[int] = [] seen: set[int] = set() for match in re.finditer(r"#(\d+)", body or ""): value = int(match.group(1)) if value in seen: continue seen.add(value) numbers.append(value) return numbers def api_get(repo: str, path: str, token: str) -> Any: req = Request( f"{API_BASE}/repos/{ORG}/{repo}{path}", headers={"Authorization": f"token {token}"}, ) with urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) def fetch_open_prs(repo: str, token: str) -> list[dict[str, Any]]: prs: list[dict[str, Any]] = [] page = 1 while True: batch = api_get(repo, f"/pulls?state=open&limit=100&page={page}", token) if not batch: break prs.extend(batch) page += 1 return prs def fetch_live_open_issue_count(repo: str, token: str) -> int: total = 0 page = 1 while True: batch = api_get(repo, f"/issues?state=open&limit=100&page={page}", token) if not batch: break total += sum(1 for item in batch if not item.get("pull_request")) page += 1 return total def parse_claimed_summary(body: str) -> tuple[int | None, int | None]: issue_match = re.search(r"has\s+(\d+)\s+real issues", body or "", flags=re.IGNORECASE) pr_match = re.search(r"and\s+(\d+)\s+open PRs", body or "", flags=re.IGNORECASE) claimed_open_issues = int(issue_match.group(1)) if issue_match else None claimed_open_prs = int(pr_match.group(1)) if pr_match else None return claimed_open_issues, claimed_open_prs def summarize_open_pr_coverage(issue_num: int, open_prs: list[dict[str, Any]]) -> str: matches: list[str] = [] seen: set[int] = set() for pr in open_prs: pr_num = pr["number"] if pr_num in seen: continue text = "\n".join( [ pr.get("title") or "", pr.get("body") or "", (pr.get("head") or {}).get("ref") or "", ] ) if f"#{issue_num}" not in text: continue seen.add(pr_num) matches.append(f"open PR #{pr_num}") return ", ".join(matches) if matches else "none" def classify_issue_reference(ref_issue: dict[str, Any], open_prs: list[dict[str, Any]]) -> dict[str, Any]: issue_num = ref_issue["number"] state = ref_issue.get("state") or "unknown" coverage = summarize_open_pr_coverage(issue_num, open_prs) if state == "closed": classification = "closed_issue" elif coverage != "none": classification = "open_with_current_pr" else: classification = "open_no_current_pr" return { "number": issue_num, "state": state, "classification": classification, "title": ref_issue.get("title") or "", "current_pr_coverage": coverage, "url": ref_issue.get("html_url") or ref_issue.get("url") or "", } def classify_pr_reference(repo: str, pr_num: int, token: str) -> dict[str, Any]: pr = api_get(repo, f"/pulls/{pr_num}", token) state = pr.get("state") or "unknown" merged = bool(pr.get("merged")) if merged: classification = "merged_pr" elif state == "open": classification = "open_pr" else: classification = "closed_unmerged_pr" return { "number": pr_num, "state": state, "merged": merged, "classification": classification, "title": pr.get("title") or "", "head": (pr.get("head") or {}).get("ref") or "", "url": pr.get("html_url") or pr.get("url") or "", } def table(rows: list[dict[str, Any]], columns: list[tuple[str, str]]) -> str: headers = [title for title, _ in columns] keys = [key for _, key in columns] if not rows: return "| None |\n|---|\n| None |" lines = ["| " + " | ".join(headers) + " |", "|" + "|".join(["---"] * len(headers)) + "|"] for row in rows: values: list[str] = [] for key in keys: value = row.get(key, "") if key == "number" and value != "": value = f"#{value}" values.append(str(value).replace("\n", " ")) lines.append("| " + " | ".join(values) + " |") return "\n".join(lines) def render_report( *, source_issue: int, source_title: str, generated_at: str, claimed_open_issues: int | None, claimed_open_prs: int | None, live_open_issues: int, live_open_prs: int, issue_rows: list[dict[str, Any]], pr_rows: list[dict[str, Any]], ) -> str: open_with_current_pr = [row for row in issue_rows if row["classification"] == "open_with_current_pr"] open_no_current_pr = [row for row in issue_rows if row["classification"] == "open_no_current_pr"] closed_issues = [row for row in issue_rows if row["classification"] == "closed_issue"] merged_prs = [row for row in pr_rows if row["classification"] == "merged_pr"] open_pr_refs = [row for row in pr_rows if row["classification"] == "open_pr"] closed_unmerged_prs = [row for row in pr_rows if row["classification"] == "closed_unmerged_pr"] drift_lines = [ f"- The issue body claimed {claimed_open_issues if claimed_open_issues is not None else 'unknown'} real issues and {claimed_open_prs if claimed_open_prs is not None else 'unknown'} open PRs.", f"- Live repo state now shows {live_open_issues} open issues and {live_open_prs} open PRs.", f"- Referenced issues now break down into {len(closed_issues)} closed, {len(open_with_current_pr)} open_with_current_pr, and {len(open_no_current_pr)} open_no_current_pr.", f"- Referenced PRs now break down into {len(merged_prs)} merged_pr, {len(open_pr_refs)} open_pr, and {len(closed_unmerged_prs)} closed_unmerged_pr.", ] return "\n".join( [ f"# The Door Fleet Work Orders Audit — issue #{source_issue}", "", f"Generated: {generated_at}", f"Source issue: `{source_title}`", "", "## Source Snapshot", "", "Issue #75 is a dated triage work-order sheet, not a normal feature request. The durable deliverable is a truth-restored audit of the referenced issue and PR set against live forge state.", "", "## Live Summary", "", f"- Referenced issues audited: {len(issue_rows)}", f"- Referenced PRs audited: {len(pr_rows)}", f"- Live repo open issues: {live_open_issues}", f"- Live repo open PRs: {live_open_prs}", f"- Open referenced issues with current PR coverage: {len(open_with_current_pr)}", f"- Open referenced issues with no current PR coverage: {len(open_no_current_pr)}", f"- Closed referenced issues: {len(closed_issues)}", f"- Closed-unmerged referenced PRs: {len(closed_unmerged_prs)}", "", "## Issue Body Drift", "", *drift_lines, "", "## Referenced Issue Snapshot", "", table( issue_rows, [ ("Issue", "number"), ("State", "state"), ("Classification", "classification"), ("Current PR Coverage", "current_pr_coverage"), ("Title", "title"), ], ), "", "## Referenced PR Snapshot", "", table( pr_rows, [ ("PR", "number"), ("State", "state"), ("Merged", "merged"), ("Classification", "classification"), ("Head", "head"), ("Title", "title"), ], ), "", "## Recommended Next Actions", "", "1. Do not trust the original work-order body as live truth; use this audit artifact for current planning.", "2. Re-triage the open_no_current_pr issues individually before dispatching new work, because the old PR references are now stale.", "3. Treat closed_unmerged_pr references as historical attempts, not active review lanes.", "4. If future work orders are needed, generate them from live forge state instead of reusing the 2026-04-09 issue body.", "5. This audit preserves operator memory; it does not claim all referenced work orders are complete.", ] ) + "\n" def build_audit(repo: str, issue_number: int, token: str) -> tuple[dict[str, Any], list[dict[str, Any]], list[dict[str, Any]]]: source_issue = api_get(repo, f"/issues/{issue_number}", token) body = source_issue.get("body") or "" refs = extract_issue_numbers(body) open_prs = fetch_open_prs(repo, token) claimed_open_issues, claimed_open_prs = parse_claimed_summary(body) issue_rows: list[dict[str, Any]] = [] pr_rows: list[dict[str, Any]] = [] for ref in refs: issue_like = api_get(repo, f"/issues/{ref}", token) if issue_like.get("pull_request"): pr_rows.append(classify_pr_reference(repo, ref, token)) else: issue_rows.append(classify_issue_reference(issue_like, open_prs)) metadata = { "source_title": source_issue.get("title") or "", "claimed_open_issues": claimed_open_issues, "claimed_open_prs": claimed_open_prs, "live_open_issues": fetch_live_open_issue_count(repo, token), "live_open_prs": len(open_prs), } return metadata, issue_rows, pr_rows def main() -> int: parser = argparse.ArgumentParser(description="Audit The Door fleet work orders issue against live forge state.") parser.add_argument("--repo", default="the-door") parser.add_argument("--issue", type=int, default=75) parser.add_argument("--token-file", default=DEFAULT_TOKEN_PATH) parser.add_argument("--output", default=DEFAULT_OUTPUT) args = parser.parse_args() token = Path(args.token_file).read_text(encoding="utf-8").strip() generated_at = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") metadata, issue_rows, pr_rows = build_audit(args.repo, args.issue, token) report = render_report( source_issue=args.issue, source_title=metadata["source_title"], generated_at=generated_at, claimed_open_issues=metadata["claimed_open_issues"], claimed_open_prs=metadata["claimed_open_prs"], live_open_issues=metadata["live_open_issues"], live_open_prs=metadata["live_open_prs"], issue_rows=issue_rows, pr_rows=pr_rows, ) output_path = Path(args.output) output_path.parent.mkdir(parents=True, exist_ok=True) output_path.write_text(report, encoding="utf-8") print(output_path) return 0 if __name__ == "__main__": raise SystemExit(main())