forked from Timmy_Foundation/the-nexus
feat(security): add fleet merge-review audit script (#1098)
This commit is contained in:
167
scripts/audit_merge_reviews.py
Normal file
167
scripts/audit_merge_reviews.py
Normal file
@@ -0,0 +1,167 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Fleet Merge Review Audit
|
||||
========================
|
||||
Scans all Timmy_Foundation repos for merges in the last 7 days
|
||||
and validates that each merged PR had at least one approving review.
|
||||
|
||||
Exit 0 = no unreviewed merges
|
||||
Exit 1 = unreviewed merges found (and issues created if --create-issues)
|
||||
|
||||
Usage:
|
||||
python scripts/audit_merge_reviews.py
|
||||
python scripts/audit_merge_reviews.py --create-issues
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import argparse
|
||||
from datetime import datetime, timedelta, timezone
|
||||
import urllib.request
|
||||
import urllib.error
|
||||
import json
|
||||
|
||||
GITEA_URL = os.getenv("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
||||
GITEA_TOKEN = os.getenv("GITEA_TOKEN", "")
|
||||
ORG = "Timmy_Foundation"
|
||||
DAYS_BACK = 7
|
||||
SECURITY_LABEL = "security"
|
||||
|
||||
|
||||
def api_request(path: str) -> dict | list:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
req = urllib.request.Request(url, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def api_post(path: str, payload: dict) -> dict:
|
||||
url = f"{GITEA_URL}/api/v1{path}"
|
||||
data = json.dumps(payload).encode()
|
||||
req = urllib.request.Request(url, data=data, headers={
|
||||
"Authorization": f"token {GITEA_TOKEN}",
|
||||
"Content-Type": "application/json",
|
||||
})
|
||||
with urllib.request.urlopen(req, timeout=30) as resp:
|
||||
return json.loads(resp.read().decode())
|
||||
|
||||
|
||||
def get_repos() -> list[str]:
|
||||
repos = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(f"/orgs/{ORG}/repos?limit=50&page={page}")
|
||||
if not batch:
|
||||
break
|
||||
repos.extend([r["name"] for r in batch])
|
||||
page += 1
|
||||
return repos
|
||||
|
||||
|
||||
def get_merged_prs(repo: str, since: str) -> list[dict]:
|
||||
"""Get closed (merged) PRs updated since `since` (ISO format)."""
|
||||
prs = []
|
||||
page = 1
|
||||
while True:
|
||||
batch = api_request(
|
||||
f"/repos/{ORG}/{repo}/pulls?state=closed&sort=updated&direction=desc&limit=50&page={page}"
|
||||
)
|
||||
if not batch:
|
||||
break
|
||||
for pr in batch:
|
||||
if pr.get("merged_at") and pr["merged_at"] >= since:
|
||||
prs.append(pr)
|
||||
elif pr.get("updated_at") and pr["updated_at"] < since:
|
||||
return prs
|
||||
page += 1
|
||||
return prs
|
||||
|
||||
|
||||
def get_reviews(repo: str, pr_number: int) -> list[dict]:
|
||||
try:
|
||||
return api_request(f"/repos/{ORG}/{repo}/pulls/{pr_number}/reviews")
|
||||
except urllib.error.HTTPError as e:
|
||||
if e.code == 404:
|
||||
return []
|
||||
raise
|
||||
|
||||
|
||||
def create_post_mortem(repo: str, pr: dict) -> int | None:
|
||||
title = f"[SECURITY] Unreviewed merge detected: {repo}#{pr['number']}"
|
||||
body = (
|
||||
f"## Unreviewed Merge Detected\n\n"
|
||||
f"- **Repository:** `{ORG}/{repo}`\n"
|
||||
f"- **PR:** #{pr['number']} — {pr['title']}\n"
|
||||
f"- **Merged by:** @{pr.get('merged_by', {}).get('login', 'unknown')}\n"
|
||||
f"- **Merged at:** {pr['merged_at']}\n"
|
||||
f"- **Commit:** `{pr.get('merge_commit_sha', 'n/a')}`\n\n"
|
||||
f"This merge had **zero approving reviews** at the time of merge.\n\n"
|
||||
f"### Required Actions\n"
|
||||
f"1. Validate the merge contents are safe.\n"
|
||||
f"2. If malicious or incorrect, revert immediately.\n"
|
||||
f"3. Document root cause (bypassed branch protection? direct push?).\n"
|
||||
)
|
||||
try:
|
||||
issue = api_post(f"/repos/{ORG}/the-nexus/issues", {
|
||||
"title": title,
|
||||
"body": body,
|
||||
"labels": [SECURITY_LABEL],
|
||||
})
|
||||
return issue.get("number")
|
||||
except Exception as e:
|
||||
print(f" FAILED to create issue: {e}")
|
||||
return None
|
||||
|
||||
|
||||
def main() -> int:
|
||||
parser = argparse.ArgumentParser()
|
||||
parser.add_argument("--create-issues", action="store_true", help="Auto-create post-mortem issues")
|
||||
args = parser.parse_args()
|
||||
|
||||
if not GITEA_TOKEN:
|
||||
print("ERROR: GITEA_TOKEN environment variable not set.")
|
||||
return 1
|
||||
|
||||
since_dt = datetime.now(timezone.utc) - timedelta(days=DAYS_BACK)
|
||||
since = since_dt.isoformat()
|
||||
|
||||
repos = get_repos()
|
||||
print(f"Auditing {len(repos)} repos for merges since {since[:19]}Z...\n")
|
||||
|
||||
unreviewed_count = 0
|
||||
for repo in repos:
|
||||
merged = get_merged_prs(repo, since)
|
||||
if not merged:
|
||||
continue
|
||||
|
||||
repo_unreviewed = []
|
||||
for pr in merged:
|
||||
reviews = get_reviews(repo, pr["number"])
|
||||
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
||||
if not approvals:
|
||||
repo_unreviewed.append(pr)
|
||||
|
||||
if repo_unreviewed:
|
||||
print(f"\n{repo}:")
|
||||
for pr in repo_unreviewed:
|
||||
print(f" ! UNREVIEWED merge: PR #{pr['number']} — {pr['title']} ({pr['merged_at'][:10]})")
|
||||
unreviewed_count += 1
|
||||
if args.create_issues:
|
||||
issue_num = create_post_mortem(repo, pr)
|
||||
if issue_num:
|
||||
print(f" → Created post-mortem issue the-nexus#{issue_num}")
|
||||
|
||||
print(f"\n{'='*60}")
|
||||
if unreviewed_count == 0:
|
||||
print("All merges in the last 7 days had at least one approving review.")
|
||||
return 0
|
||||
else:
|
||||
print(f"Found {unreviewed_count} unreviewed merge(s).")
|
||||
return 1
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
Reference in New Issue
Block a user