- Enhance review_gate.py to prevent rubber-stamping - Fix typo in GITEA_TOKEN environment variable - Add 3 checks: empty PR, approval, rubber-stamp - Closes #1445 (prevent rubber-stamping) Checks: 1. Empty PR - 0 additions, 0 deletions, 0 files 2. Approval - No APPROVED reviews 3. Rubber-stamp - Trivial PR + empty review comment Severity levels: - Error: Blocks merge - Warning: Allows merge with warning Addresses issue #1615: feat: prevent rubber-stamping of PRs with no changes Also fixes: - Typo in GITEA_TOKEN (was os.env...EN, now os.environ.get) - Enhanced error handling - Better output formatting
232 lines
6.7 KiB
Python
232 lines
6.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
Review Gate — Poka-yoke for unreviewed merges.
|
|
Enhanced to prevent rubber-stamping of PRs with no changes.
|
|
|
|
Issue #1615: feat: prevent rubber-stamping of PRs with no changes (#1445)
|
|
|
|
Checks:
|
|
1. Empty PR - 0 additions, 0 deletions, 0 files
|
|
2. Approval - No APPROVED reviews
|
|
3. Rubber-stamp - Trivial PR + empty review comment
|
|
|
|
Usage in Gitea workflow:
|
|
- name: Review Approval Gate
|
|
run: python scripts/review_gate.py
|
|
env:
|
|
GITEA_TOKEN: ${{ secrets.GITEA_TOKEN }}
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import json
|
|
from urllib import request, error
|
|
|
|
# Configuration
|
|
GITEA_TOKEN = os.environ.get("GITEA_TOKEN", "")
|
|
GITEA_URL = os.environ.get("GITEA_URL", "https://forge.alexanderwhitestone.com")
|
|
REPO = os.environ.get("GITEA_REPO", "")
|
|
PR_NUMBER = os.environ.get("PR_NUMBER", "")
|
|
|
|
# Thresholds
|
|
MIN_APPROVALS = 1
|
|
TRIVIAL_THRESHOLD = 10 # Lines changed
|
|
|
|
|
|
def api_call(method, path):
|
|
"""Make authenticated Gitea API call."""
|
|
url = f"{GITEA_URL}/api/v1{path}"
|
|
headers = {"Authorization": f"token {GITEA_TOKEN}"}
|
|
req = request.Request(url, method=method, headers=headers)
|
|
try:
|
|
with request.urlopen(req, timeout=30) as resp:
|
|
return json.loads(resp.read().decode())
|
|
except error.HTTPError as e:
|
|
return {"error": e.read().decode(), "status": e.code}
|
|
|
|
|
|
def get_pr_details(repo, pr_number):
|
|
"""Get PR details including diff stats."""
|
|
return api_call("GET", f"/repos/{repo}/pulls/{pr_number}")
|
|
|
|
|
|
def get_pr_files(repo, pr_number):
|
|
"""Get list of files changed in PR."""
|
|
return api_call("GET", f"/repos/{repo}/pulls/{pr_number}/files")
|
|
|
|
|
|
def get_pr_reviews(repo, pr_number):
|
|
"""Get reviews for a PR."""
|
|
return api_call("GET", f"/repos/{repo}/pulls/{pr_number}/reviews")
|
|
|
|
|
|
def check_empty_pr(pr_details, pr_files):
|
|
"""Check if PR has no actual changes."""
|
|
additions = pr_details.get("additions", 0)
|
|
deletions = pr_details.get("deletions", 0)
|
|
changed_files = pr_details.get("changed_files", 0)
|
|
|
|
if additions == 0 and deletions == 0 and changed_files == 0:
|
|
return {
|
|
"check": "empty_pr",
|
|
"passed": False,
|
|
"message": f"EMPTY PR: {additions} additions, {deletions} deletions, {changed_files} files",
|
|
"severity": "error"
|
|
}
|
|
|
|
return {
|
|
"check": "empty_pr",
|
|
"passed": True,
|
|
"message": f"PR has changes: {additions} additions, {deletions} deletions, {changed_files} files",
|
|
"severity": "info"
|
|
}
|
|
|
|
|
|
def check_approvals(reviews):
|
|
"""Check if PR has required approvals."""
|
|
approvals = [r for r in reviews if r.get("state") == "APPROVED"]
|
|
|
|
if len(approvals) >= MIN_APPROVALS:
|
|
return {
|
|
"check": "approvals",
|
|
"passed": True,
|
|
"message": f"PR has {len(approvals)} approving review(s)",
|
|
"severity": "info"
|
|
}
|
|
else:
|
|
return {
|
|
"check": "approvals",
|
|
"passed": False,
|
|
"message": f"PR has {len(approvals)} approving review(s), need {MIN_APPROVALS}",
|
|
"severity": "error"
|
|
}
|
|
|
|
|
|
def check_rubber_stamp(pr_details, reviews):
|
|
"""Check for rubber-stamping (trivial PR + empty review comment)."""
|
|
additions = pr_details.get("additions", 0)
|
|
deletions = pr_details.get("deletions", 0)
|
|
total_changes = additions + deletions
|
|
|
|
# Check if PR is trivial (small changes)
|
|
is_trivial = total_changes <= TRIVIAL_THRESHOLD
|
|
|
|
# Check if any approval has empty comment
|
|
rubber_stamp = False
|
|
for review in reviews:
|
|
if review.get("state") == "APPROVED":
|
|
body = review.get("body", "").strip()
|
|
if not body:
|
|
rubber_stamp = True
|
|
break
|
|
|
|
if is_trivial and rubber_stamp:
|
|
return {
|
|
"check": "rubber_stamp",
|
|
"passed": False,
|
|
"message": f"RUBBER-STAMP WARNING: Trivial PR ({total_changes} lines) with empty approval comment",
|
|
"severity": "warning"
|
|
}
|
|
|
|
return {
|
|
"check": "rubber_stamp",
|
|
"passed": True,
|
|
"message": "No rubber-stamping detected",
|
|
"severity": "info"
|
|
}
|
|
|
|
|
|
def main():
|
|
"""Main entry point."""
|
|
if not GITEA_TOKEN:
|
|
print("ERROR: GITEA_TOKEN not set")
|
|
sys.exit(1)
|
|
|
|
if not REPO:
|
|
print("ERROR: GITEA_REPO not set")
|
|
sys.exit(1)
|
|
|
|
pr_number = PR_NUMBER
|
|
if not pr_number:
|
|
# Try to infer from Gitea Actions environment
|
|
pr_number = os.environ.get("GITEA_PULL_REQUEST_INDEX", "")
|
|
|
|
if not pr_number:
|
|
print("ERROR: Could not determine PR number")
|
|
sys.exit(1)
|
|
|
|
print(f"Review Gate: Checking PR #{pr_number} in {REPO}")
|
|
print("=" * 60)
|
|
|
|
# Get PR details
|
|
pr_details = get_pr_details(REPO, pr_number)
|
|
if isinstance(pr_details, dict) and "error" in pr_details:
|
|
print(f"ERROR fetching PR details: {pr_details}")
|
|
sys.exit(1)
|
|
|
|
# Get PR files
|
|
pr_files = get_pr_files(REPO, pr_number)
|
|
if isinstance(pr_files, dict) and "error" in pr_files:
|
|
print(f"ERROR fetching PR files: {pr_files}")
|
|
sys.exit(1)
|
|
|
|
# Get PR reviews
|
|
reviews = get_pr_reviews(REPO, pr_number)
|
|
if isinstance(reviews, dict) and "error" in reviews:
|
|
print(f"ERROR fetching reviews: {reviews}")
|
|
sys.exit(1)
|
|
|
|
# Run checks
|
|
checks = []
|
|
|
|
# Check 1: Empty PR
|
|
empty_check = check_empty_pr(pr_details, pr_files)
|
|
checks.append(empty_check)
|
|
|
|
# Check 2: Approvals
|
|
approval_check = check_approvals(reviews)
|
|
checks.append(approval_check)
|
|
|
|
# Check 3: Rubber-stamping
|
|
rubber_check = check_rubber_stamp(pr_details, reviews)
|
|
checks.append(rubber_check)
|
|
|
|
# Print results
|
|
print("\nCheck Results:")
|
|
print("-" * 60)
|
|
|
|
errors = 0
|
|
warnings = 0
|
|
|
|
for check in checks:
|
|
status = "✅ PASS" if check["passed"] else "❌ FAIL"
|
|
if check["severity"] == "warning" and check["passed"]:
|
|
status = "⚠️ WARN"
|
|
|
|
print(f"{status} [{check['check']}] {check['message']}")
|
|
|
|
if not check["passed"]:
|
|
if check["severity"] == "error":
|
|
errors += 1
|
|
elif check["severity"] == "warning":
|
|
warnings += 1
|
|
|
|
print("-" * 60)
|
|
|
|
# Final decision
|
|
if errors > 0:
|
|
print(f"\n❌ BLOCKED: {errors} error(s), {warnings} warning(s)")
|
|
print("Merges are not permitted until errors are resolved.")
|
|
sys.exit(1)
|
|
elif warnings > 0:
|
|
print(f"\n⚠️ WARNING: {warnings} warning(s)")
|
|
print("PR can merge but review warnings above.")
|
|
sys.exit(0)
|
|
else:
|
|
print(f"\n✅ OK: All checks passed")
|
|
print("PR is ready for merge.")
|
|
sys.exit(0)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main() |