#!/usr/bin/env python3 """ Fleet Merge Review Audit ======================== Scans all Timmy_Foundation repos for merges in the last 7 days and validates that each merged PR had at least one approving review. Exit 0 = no unreviewed merges Exit 1 = unreviewed merges found (and issues created if --create-issues) Usage: python scripts/audit_merge_reviews.py python scripts/audit_merge_reviews.py --create-issues """ import os import sys import argparse from datetime import datetime, timedelta, timezone import urllib.request import urllib.error import json GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com") GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") ORG = "Timmy_Foundation" DAYS_BACK = 7 SECURITY_LABEL = "security" def api_request(path: str) -> dict | list: url = f"{GITEA_URL}/api/v1{path}" req = urllib.request.Request(url, headers={ "Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json", }) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) def api_post(path: str, payload: dict) -> dict: url = f"{GITEA_URL}/api/v1{path}" data = json.dumps(payload).encode() req = urllib.request.Request(url, data=data, headers={ "Authorization": f"token {GITEA_TOKEN}", "Content-Type": "application/json", }) with urllib.request.urlopen(req, timeout=30) as resp: return json.loads(resp.read().decode()) def get_repos() -> list[str]: repos = [] page = 1 while True: batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}") if not batch: break repos.extend([r["name"] for r in batch]) page += 1 return repos def get_merged_prs(repo: str, since: str) -> list[dict]: """Get closed (merged) PRs updated since `since` (ISO format).""" prs = [] page = 1 while True: batch = api_request( f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}" ) if not batch: break for pr in batch: if pr.get("merged_at") and pr["merged_at"] >= since: prs.append(pr) elif pr.get("updated_at") and pr["updated_at"] < since: return prs page += 1 return prs def get_reviews(repo: str, pr_number: int) -> list[dict]: try: return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews") except urllib.error.HTTPError as e: if e.code == 404: return [] raise def create_post_mortem(repo: str, pr: dict) -> int | None: title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}" body = ( f"## Unreviewed Merge Detected\n\n" f"- **Repository:** `{ORG}/{repo}`\n" f"- **PR:** #{pr['number']} — {pr['title']}\n" f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n" f"- **Merged at:** {pr['merged_at']}\n" f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n" f"This merge had **zero approving reviews** at the time of merge.\n\n" f"### Required Actions\n" f"1. Validate the merge contents are safe.\n" f"2. If malicious or incorrect, revert immediately.\n" f"3. Document root cause (bypassed branch protection? direct push?).\n" ) try: issue = api_post(f"/repos/{ORG}/the-nexus/issues", { "title": title, "body": body, "labels": [SECURITY_LABEL], }) return issue.get("number") except Exception as e: print(f" FAILED to create issue: {e}") return None def main() -> int: parser = argparse.ArgumentParser() parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues") args = parser.parse_args() if not GITEA_TOKEN: print("ERROR: GITEA_TOKEN environment variable not set.") return 1 since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK) since = since_dt.isoformat() repos = get_repos() print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n") unreviewed_count = 0 for repo in repos: merged = get_merged_prs(repo, since) if not merged: continue repo_unreviewed = [] for pr in merged: reviews = get_reviews(repo, pr["number"]) approvals = [r for r in reviews if r.get("state") == "APPROVED"] if not approvals: repo_unreviewed.append(pr) if repo_unreviewed: print(f"\n{repo}:") for pr in repo_unreviewed: print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})") unreviewed_count += 1 if args.create_issues: issue_num = create_post_mortem(repo, pr) if issue_num: print(f" → Created post-mortem issue the-nexus#{issue_num}") print(f"\n{'='*60}") if unreviewed_count == 0: print("All merges in the last 7 days had at least one approving review.") return 0 else: print(f"Found {unreviewed_count} unreviewed merge(s).") return 1 if __name__ == "__main__": raise SystemExit(main())