From 804536a3f2bf2faa2642fce0eabbe3861dbcecf1 Mon Sep 17 00:00:00 2001 From: Bezalel Date: Tue, 7 Apr 2026 14:52:58 +0000 Subject: [PATCH] feat(security): add fleet merge-review audit script (#1098) --- scripts/audit_merge_reviews.py | 167 +++++++++++++++++++++++++++++++++ 1 file changed, 167 insertions(+) create mode 100644 scripts/audit_merge_reviews.py diff --git a/scripts/audit_merge_reviews.py b/scripts/audit_merge_reviews.py new file mode 100644 index 000000000..b7a1a1c11 --- /dev/null +++ b/scripts/audit_merge_reviews.py @@ -0,0 +1,167 @@ +#!/usr/bin/env python3 +""" +Fleet Merge Review Audit +======================== +Scans all Timmy_Foundation repos for merges in the last 7 days +and validates that each merged PR had at least one approving review. + +Exit 0 = no unreviewed merges +Exit 1 = unreviewed merges found (and issues created if --create-issues) + +Usage: + python scripts/audit_merge_reviews.py + python scripts/audit_merge_reviews.py --create-issues +""" + +import os +import sys +import argparse +from datetime import datetime, timedelta, timezone +import urllib.request +import urllib.error +import json + +GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com") +GITEA_TOKEN = os.getenv("GITEA_TOKEN", "") +ORG = "Timmy_Foundation" +DAYS_BACK = 7 +SECURITY_LABEL = "security" + + +def api_request(path: str) -> dict | list: + url = f"{GITEA_URL}/api/v1{path}" + req = urllib.request.Request(url, headers={ + "Authorization": f"token {GITEA_TOKEN}", + "Content-Type": "application/json", + }) + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + + +def api_post(path: str, payload: dict) -> dict: + url = f"{GITEA_URL}/api/v1{path}" + data = json.dumps(payload).encode() + req = urllib.request.Request(url, data=data, headers={ + "Authorization": f"token {GITEA_TOKEN}", + "Content-Type": "application/json", + }) + with urllib.request.urlopen(req, timeout=30) as resp: + return json.loads(resp.read().decode()) + + +def get_repos() -> list[str]: + repos = [] + page = 1 + while True: + batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}") + if not batch: + break + repos.extend([r["name"] for r in batch]) + page += 1 + return repos + + +def get_merged_prs(repo: str, since: str) -> list[dict]: + """Get closed (merged) PRs updated since `since` (ISO format).""" + prs = [] + page = 1 + while True: + batch = api_request( + f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}" + ) + if not batch: + break + for pr in batch: + if pr.get("merged_at") and pr["merged_at"] >= since: + prs.append(pr) + elif pr.get("updated_at") and pr["updated_at"] < since: + return prs + page += 1 + return prs + + +def get_reviews(repo: str, pr_number: int) -> list[dict]: + try: + return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews") + except urllib.error.HTTPError as e: + if e.code == 404: + return [] + raise + + +def create_post_mortem(repo: str, pr: dict) -> int | None: + title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}" + body = ( + f"## Unreviewed Merge Detected\n\n" + f"- **Repository:** `{ORG}/{repo}`\n" + f"- **PR:** #{pr['number']} — {pr['title']}\n" + f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n" + f"- **Merged at:** {pr['merged_at']}\n" + f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n" + f"This merge had **zero approving reviews** at the time of merge.\n\n" + f"### Required Actions\n" + f"1. Validate the merge contents are safe.\n" + f"2. If malicious or incorrect, revert immediately.\n" + f"3. Document root cause (bypassed branch protection? direct push?).\n" + ) + try: + issue = api_post(f"/repos/{ORG}/the-nexus/issues", { + "title": title, + "body": body, + "labels": [SECURITY_LABEL], + }) + return issue.get("number") + except Exception as e: + print(f" FAILED to create issue: {e}") + return None + + +def main() -> int: + parser = argparse.ArgumentParser() + parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues") + args = parser.parse_args() + + if not GITEA_TOKEN: + print("ERROR: GITEA_TOKEN environment variable not set.") + return 1 + + since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK) + since = since_dt.isoformat() + + repos = get_repos() + print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n") + + unreviewed_count = 0 + for repo in repos: + merged = get_merged_prs(repo, since) + if not merged: + continue + + repo_unreviewed = [] + for pr in merged: + reviews = get_reviews(repo, pr["number"]) + approvals = [r for r in reviews if r.get("state") == "APPROVED"] + if not approvals: + repo_unreviewed.append(pr) + + if repo_unreviewed: + print(f"\n{repo}:") + for pr in repo_unreviewed: + print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})") + unreviewed_count += 1 + if args.create_issues: + issue_num = create_post_mortem(repo, pr) + if issue_num: + print(f" → Created post-mortem issue the-nexus#{issue_num}") + + print(f"\n{'='*60}") + if unreviewed_count == 0: + print("All merges in the last 7 days had at least one approving review.") + return 0 + else: + print(f"Found {unreviewed_count} unreviewed merge(s).") + return 1 + + +if __name__ == "__main__": + raise SystemExit(main())